Skip to content

Commit

Permalink
feat(server): Asynchronous server-side background task execution
Browse files Browse the repository at this point in the history
This patch implements the whole support ecosystem for server-side
background tasks, in order to help lessen the load (and blocking) of API
handlers in the web-server for long-running operations.

A **Task** is represented by two things in strict co-existence: a
lightweight, `pickle`-able implementation in the server's code (a
subclass of `AbstractTask`) and a corresponding `BackgroundTask`
database entity, which resides in the "configuration" database (shared
across all products).
A Task is created by API request handlers and then the user is
instructed to retain the `TaskToken`: the task's unique identifier.
Following, the server will dispatch execution of the object into a
background worker process, and keep status synchronisation via the
database.
Even in a service cluster deployment, load balancing will not interfere
with users' ability to query a task's status.

While normal users can only query the status of a single task (which is
usually automatically done by client code, and not the user manually
executing something); product administrators, and especially server
administrators have the ability to query an arbitrary set of tasks using
the potential filters, with a dedicated API function (`getTasks()`) for
this purpose.

Tasks can be cancelled only by `SUPERUSER`s, at which point a special
binary flag is set in the status record.
However, to prevent complicating inter-process communication,
cancellation is supposed to be implemented by `AbstractTask` subclasses
in a co-operative way.
The execution of tasks in a process and a `Task`'s ability to
"communicate" with its execution environment is achieved through the new
`TaskManager` instance, which is created for every process of a server's
deployment.

Unfortunately, tasks can die gracelessly if the server is terminated
(either internally, or even externally).
For this reason, the `DROPPED` status will indicate that the server has
terminated prior to, or during a task's execution, and it was unable to
produce results.
The server was refactored significantly around the handling of subprocesses
in order to support various server shutdown scenarios.

Servers will start `background_worker_processes` number of task handling
subprocesses, which are distinct from the already existing "API
handling" subprocesses.
By default, if unconfigured, `background_worker_processes` is equal to
`worker_processes` (the number of API processes to spawn), which is
equal to `$(nproc)` (CPU count in the system).

This patch includes a `TestingDummyTask` demonstrative subclass of
`AbstractTask` which counts up to an input number of seconds, and each
second it gracefully checks whether it is being killed.
The corresponding testing API endpoint, `createDummyTask()` can specify
whether the task should simulate a failing status.
This endpoint can only be used from, but is used extensively, the unit
testing of the project.

This patch does not include "nice" or "ergonomic" facilities for admins
to manage the tasks, and so far, only the server-side of the
corresponding API calls are supported.
  • Loading branch information
whisperity committed Sep 26, 2024
1 parent f85f771 commit d2bb4d5
Show file tree
Hide file tree
Showing 46 changed files with 2,869 additions and 356 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
- name: Install dependencies
run: |
pip install $(grep -iE "pylint|pycodestyle" analyzer/requirements_py/dev/requirements.txt)
- name: Run tests
run: make pylint pycodestyle
- name: Run pycodestyle & pylint
run: make -k pycodestyle pylint

tools:
name: Tools (report-converter, etc.)
Expand Down
39 changes: 1 addition & 38 deletions analyzer/codechecker_analyzer/analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
import shutil
import signal
import sys
import time
import traceback
import zipfile

from threading import Timer

import multiprocess
import psutil

from codechecker_common.logger import get_logger
from codechecker_common.process import kill_process_tree
from codechecker_common.review_status_handler import ReviewStatusHandler

from codechecker_statistics_collector.collectors.special_return_value import \
Expand Down Expand Up @@ -341,42 +340,6 @@ def handle_failure(
os.remove(plist_file)


def kill_process_tree(parent_pid, recursive=False):
"""Stop the process tree try it gracefully first.
Try to stop the parent and child processes gracefuly
first if they do not stop in time send a kill signal
to every member of the process tree.
There is a similar function in the web part please
consider to update that in case of changing this.
"""
proc = psutil.Process(parent_pid)
children = proc.children(recursive)

# Send a SIGTERM (Ctrl-C) to the main process
proc.terminate()

# If children processes don't stop gracefully in time,
# slaughter them by force.
_, still_alive = psutil.wait_procs(children, timeout=5)
for p in still_alive:
p.kill()

# Wait until this process is running.
n = 0
timeout = 10
while proc.is_running():
if n > timeout:
LOG.warning("Waiting for process %s to stop has been timed out"
"(timeout = %s)! Process is still running!",
parent_pid, timeout)
break

time.sleep(1)
n += 1


def setup_process_timeout(proc, timeout,
failure_callback=None):
"""
Expand Down
49 changes: 27 additions & 22 deletions bin/CodeChecker
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# -------------------------------------------------------------------------

"""
Used to kickstart CodeChecker.
Save original environment without modifications.
Saves original environment without modifications.
Used to run the logging in the same env.
"""
# This is for enabling CodeChecker as a filename (i.e. module name).
Expand All @@ -25,9 +25,10 @@ import sys
import tempfile

PROC_PID = None
EXIT_CODE = None


def run_codechecker(checker_env, subcommand=None):
def run_codechecker(checker_env, subcommand=None) -> int:
"""
Run the CodeChecker.
* checker_env - CodeChecker will be run in the checker env.
Expand Down Expand Up @@ -63,11 +64,13 @@ def run_codechecker(checker_env, subcommand=None):
global PROC_PID
PROC_PID = proc.pid

proc.wait()
sys.exit(proc.returncode)
global EXIT_CODE
EXIT_CODE = proc.wait()

return EXIT_CODE


def main(subcommand=None):
def main(subcommand=None) -> int:
original_env = os.environ.copy()
checker_env = original_env

Expand All @@ -94,30 +97,32 @@ def main(subcommand=None):
print('Saving original build environment failed.')
print(ex)

def signal_term_handler(signum, _frame):
def signal_handler(signum, _frame):
"""
Forwards the received signal to the CodeChecker subprocess started by
this `main` script.
"""
global PROC_PID
if PROC_PID and sys.platform != "win32":
os.kill(PROC_PID, signal.SIGINT)

_remove_tmp()
sys.exit(128 + signum)

signal.signal(signal.SIGTERM, signal_term_handler)
signal.signal(signal.SIGINT, signal_term_handler)

def signal_reload_handler(_sig, _frame):
global PROC_PID
if PROC_PID:
os.kill(PROC_PID, signal.SIGHUP)
try:
os.kill(PROC_PID, signum)
except ProcessLookupError:
pass

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if sys.platform != "win32":
signal.signal(signal.SIGHUP, signal_reload_handler)
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGCHLD, signal_handler)

try:
run_codechecker(checker_env, subcommand)
global EXIT_CODE
EXIT_CODE = run_codechecker(checker_env, subcommand)
finally:
_remove_tmp()

return EXIT_CODE


if __name__ == "__main__":
main(None)
sys.exit(main(None) or 0)
15 changes: 11 additions & 4 deletions codechecker_common/compatibility/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
# pylint: disable=no-name-in-module
# pylint: disable=unused-import
if sys.platform in ["darwin", "win32"]:
from multiprocess import Pool # type: ignore
from multiprocess import cpu_count
from multiprocess import \
Pool, Process, \
Queue, \
Value, \
cpu_count
else:
from concurrent.futures import ProcessPoolExecutor as Pool # type: ignore
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import \
Process, \
Queue, \
Value, \
cpu_count
54 changes: 48 additions & 6 deletions codechecker_common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
#
# -------------------------------------------------------------------------


import argparse
import datetime
import json
import logging
from logging import config
from pathlib import Path
import os
import sys
from typing import Optional

# The logging leaves can be accesses without
# importing the logging module in other modules.
# The logging leaves can be accesses without importing the logging module in
# other modules.
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
Expand All @@ -25,14 +27,24 @@

CMDLINE_LOG_LEVELS = ['info', 'debug_analyzer', 'debug']

DEBUG_ANALYZER = logging.DEBUG_ANALYZER = 15 # type: ignore
DEBUG_ANALYZER = 15
logging.addLevelName(DEBUG_ANALYZER, 'DEBUG_ANALYZER')


_Levels = {"DEBUG": DEBUG,
"DEBUG_ANALYZER": DEBUG_ANALYZER,
"INFO": INFO,
"WARNING": WARNING,
"ERROR": ERROR,
"CRITICAL": CRITICAL,
"NOTSET": NOTSET,
}


class CCLogger(logging.Logger):
def debug_analyzer(self, msg, *args, **kwargs):
if self.isEnabledFor(logging.DEBUG_ANALYZER):
self._log(logging.DEBUG_ANALYZER, msg, args, **kwargs)
if self.isEnabledFor(DEBUG_ANALYZER):
self._log(DEBUG_ANALYZER, msg, args, **kwargs)


logging.setLoggerClass(CCLogger)
Expand Down Expand Up @@ -113,6 +125,36 @@ def validate_loglvl(log_level):
return log_level


def raw_sprint_log(logger: logging.Logger, level: str, message: str) \
-> Optional[str]:
"""
Formats a raw log `message` using the date format of the specified
`logger`, without actually invoking the logging infrastructure.
"""
if not logger.isEnabledFor(_Levels[level]):
return None

formatter = logger.handlers[0].formatter if len(logger.handlers) > 0 \
else None
datefmt = formatter.datefmt if formatter else None
time = datetime.datetime.now().strftime(datefmt) if datefmt \
else str(datetime.datetime.now())

return f"[{validate_loglvl(level)} {time}] - {message}"


def signal_log(logger: logging.Logger, level: str, message: str):
"""
Simulates a log output and logs a message within a signal handler, without
triggering a `RuntimeError` due to reentrancy in `print`-like method calls.
"""
formatted = raw_sprint_log(logger, level, message)
if not formatted:
return

os.write(sys.stderr.fileno(), f"{formatted}\n".encode())


class LogCfgServer:
"""
Initialize a log configuration server for dynamic log configuration.
Expand Down
49 changes: 49 additions & 0 deletions codechecker_common/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -------------------------------------------------------------------------
#
# Part of the CodeChecker project, under the Apache License v2.0 with
# LLVM Exceptions. See LICENSE for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# -------------------------------------------------------------------------
import time

import psutil

from .logger import get_logger


LOG = get_logger("system")


def kill_process_tree(parent_pid, recursive=False):
"""
Stop the process tree, gracefully at first.
Try to stop the parent and child processes gracefuly first.
If they do not stop in time, send a kill signal to every member of the
process tree.
"""
proc = psutil.Process(parent_pid)
children = proc.children(recursive)

# Send a SIGTERM to the main process.
proc.terminate()

# If children processes don't stop gracefully in time, slaughter them
# by force.
_, still_alive = psutil.wait_procs(children, timeout=5)
for p in still_alive:
p.kill()

# Wait until this process is running.
n = 0
timeout = 10
while proc.is_running():
if n > timeout:
LOG.warning("Waiting for process %s to stop has been timed out"
"(timeout = %s)! Process is still running!",
parent_pid, timeout)
break

time.sleep(1)
n += 1
19 changes: 19 additions & 0 deletions codechecker_common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
"""
Util module.
"""
import datetime
import hashlib
import itertools
import json
import os
import random
from typing import TextIO

import portalocker
Expand Down Expand Up @@ -112,3 +115,19 @@ def path_for_fake_root(full_path: str, root_path: str = '/') -> str:
def strtobool(value: str) -> bool:
"""Parse a string value to a boolean."""
return value.lower() in ('y', 'yes', 't', 'true', 'on', '1')


def generate_random_token(num_bytes: int = 32) -> str:
"""
Returns a random-generated string usable as a token with `num_bytes`
hexadecimal characters in the output.
"""
prefix = str(os.getpid()).encode()
suffix = str(datetime.datetime.now()).encode()

hash_value = ''.join(
[hashlib.sha256(prefix + os.urandom(num_bytes * 2) + suffix)
.hexdigest()
for _ in range(0, -(num_bytes // -64))])
idx = random.randrange(0, len(hash_value) - num_bytes + 1)
return hash_value[idx:(idx + num_bytes)]
10 changes: 9 additions & 1 deletion docs/web/server_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@ Table of Contents
* [Size of the compilation database](#size-of-the-compilation-database)
* [Authentication](#authentication)

## Number of worker processes
## Number of API worker processes
The `worker_processes` section of the config file controls how many processes
will be started on the server to process API requests.

*Default value*: <CPU count>

The server needs to be restarted if the value is changed in the config file.

### Number of task worker processes
The `background_worker_processes` section of the config file controls how many
processes will be started on the server to process background jobs.

*Default value*: Fallback to same amount as `worker_processes`.

The server needs to be restarted if the value is changed in the config file.

## Run limitation
The `max_run_count` section of the config file controls how many runs can be
stored on the server for a product.
Expand Down
Loading

0 comments on commit d2bb4d5

Please sign in to comment.