diff --git a/docs/releases.rst b/docs/releases.rst index 5d72fdcf6d..f67575ff3a 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -4,6 +4,24 @@ Releases ====================== +tmt-1.31 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The :ref:`/spec/plans/provision` step is now able to perform +**provisioning of multiple guests in parallel**. This can +considerably shorten the time needed for guest provisioning in +multihost plans. However, whether the parallel provisioning would +take place depends on what provision plugins were involved, +because not all plugins are compatible with this feature yet. As +of now, only :ref:`/spec/plans/provision/artemis`, +:ref:`/spec/plans/provision/connect`, +:ref:`/spec/plans/provision/container`, +:ref:`/spec/plans/provision/local`, and +:ref:`/spec/plans/provision/virtual` are supported. All other +plugins would gracefully fall back to the pre-1.31 behavior, +provisioning in sequence. + + tmt-1.30 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/tests/login/step.sh b/tests/login/step.sh index c6ada18f88..86aeea5449 100755 --- a/tests/login/step.sh +++ b/tests/login/step.sh @@ -27,7 +27,7 @@ rlJournalStart rlAssertGrep "interactive" "output" if [ "$step" = "provision" ]; then - rlRun "grep '^ $step$' -A6 output | grep -i interactive" + rlRun "grep '^ $step$' -A12 output | grep -i interactive" elif [ "$step" = "prepare" ]; then rlRun "grep '^ $step$' -A8 output | grep -i interactive" elif [ "$step" = "execute" ]; then diff --git a/tests/multihost/complete/test.sh b/tests/multihost/complete/test.sh index 89630b6ab6..185293e7af 100755 --- a/tests/multihost/complete/test.sh +++ b/tests/multihost/complete/test.sh @@ -14,10 +14,10 @@ function check_current_topology () { } function check_shared_topology () { - rlAssertEquals "Guest names" "client-1 client-2 server" "$(yq -r '."guest-names" | join(" ")' $1)" - rlAssertEquals "Role names" "client server" "$(yq -r '."role-names" | join(" ")' $1)" - rlAssertEquals "Client role guests" "client-1 client-2" "$(yq -r '.roles.client | join(" ")' $1)" - rlAssertEquals "Server role guests" "server" "$(yq -r '.roles.server | join(" ")' $1)" + rlAssertEquals "Guest names" "client-1 client-2 server" "$(yq -r '."guest-names" | sort | join(" ")' $1)" + rlAssertEquals "Role names" "client server" "$(yq -r '."role-names" | sort | join(" ")' $1)" + rlAssertEquals "Client role guests" "client-1 client-2" "$(yq -r '.roles.client | sort | join(" ")' $1)" + rlAssertEquals "Server role guests" "server" "$(yq -r '.roles.server | sort | join(" ")' $1)" rlAssertEquals "Guest client-1 name" "client-1" "$(yq -r '.guests["client-1"].name' $1)" rlAssertEquals "Guest client-1 role" "client" "$(yq -r '.guests["client-1"].role' $1)" diff --git a/tmt/queue.py b/tmt/queue.py index 490445798a..61156f61df 100644 --- a/tmt/queue.py +++ b/tmt/queue.py @@ -3,8 +3,7 @@ from concurrent.futures import Future, ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, Generic, Optional, TypeVar -import fmf.utils - +import tmt.utils from tmt.log import Logger if TYPE_CHECKING: @@ -13,24 +12,24 @@ from tmt.steps.provision import Guest -TaskT = TypeVar('TaskT', bound='_Task') +TaskResultT = TypeVar('TaskResultT') @dataclasses.dataclass -class TaskOutcome(Generic[TaskT]): +class Task(Generic[TaskResultT]): """ - Outcome of a queued task executed on a guest. + A base class for queueable actions. - Bundles together interesting objects related to how the task has been - executed, where and what was the result. + The class provides not just the tracking, but the implementation of the said + action as well. Child classes must implement their action functionality in + :py:meth:`go`. """ - #: A :py:class:`_Task` instace the outcome relates to. - task: TaskT - #: A logger to use for logging events related to the outcome. logger: Logger + result: Optional[TaskResultT] + #: Guest on which the phase was executed. May be unset, some tasks #: may handle multiguest actions on their own. guest: Optional['Guest'] @@ -39,17 +38,9 @@ class TaskOutcome(Generic[TaskT]): #: is saved in this field. exc: Optional[Exception] - -@dataclasses.dataclass -class _Task: - """ A base class for tasks to be executed on one or more guests """ - - #: A list of guests to execute the task on. - guests: list['Guest'] - - #: A logger to use for logging events related to the task. It serves as - #: a root logger for new loggers queue may spawn for each guest. - logger: Logger + #: If set, the task raised :py:class:`SystemExit` exception, and wants to + #: terminate the run completely. + requested_exit: Optional[SystemExit] @property def name(self) -> str: @@ -62,98 +53,151 @@ def name(self) -> str: raise NotImplementedError - @property - def guest_ids(self) -> list[str]: - return [guest.multihost_name for guest in self.guests] + def go(self) -> Iterator['Self']: + """ + Perform the task. + + Called by :py:class:`Queue` machinery to accomplish the task. - def go(self) -> Iterator[TaskOutcome['Self']]: - """ Perform the task """ + :yields: instances of the same class, describing invocations of the + task and their outcome. The task might be executed multiple times, + depending on how exactly it was queued, and method would yield + corresponding results. + """ raise NotImplementedError -@dataclasses.dataclass -class GuestlessTask(_Task): +TaskT = TypeVar('TaskT', bound='Task') # type: ignore[type-arg] + + +def prepare_loggers(logger: Logger, labels: list[str]) -> dict[str, Logger]: """ - A task that does not run on a particular guest. + Create loggers for a set of labels. + + Guests are assumed to be a group a phase would be executed on, and + therefore their labels need to be set, to provide context, plus their + labels need to be properly aligned for more readable output. + """ + + loggers: dict[str, Logger] = {} + + # First, spawn all loggers, and set their labels if needed. + # Don't bother with labels if there's just a single guest. + for label in labels: + new_logger = logger.clone() - .. note:: + if len(labels) > 1: + new_logger.labels.append(label) + + loggers[label] = new_logger + + # Second, find the longest label, and instruct all loggers to pad their + # labels to match this length. This should create well-indented messages. + max_label_span = max(new_logger.labels_span for new_logger in loggers.values()) + + for new_logger in loggers.values(): + new_logger.labels_padding = max_label_span + + return loggers + + +@dataclasses.dataclass +class GuestlessTask(Task[TaskResultT]): + """ + A task not assigned to a particular set of guests. - This may sound unexpected, but there are tasks that need to be part - of the queue, but need no specific guest to run on. Usualy, they handle - the multihost environment on their own. See :py:class:`tmt.steps.Login` - and :py:class:`tmt.steps.Reboot`. + An extension of the :py:class:`Task` class, provides a quite generic wrapper + for the actual task which takes care of catching exceptions and proper + reporting. """ - def run(self, logger: Logger) -> None: + def run(self, logger: Logger) -> TaskResultT: + """ + Perform the task. + + Called once from :py:meth:`go`. Subclasses of :py:class:`GuestlessTask` + should implement their logic in this method rather than in + :py:meth:`go` which is already provided. If your task requires different + handling in :py:class:`go`, it might be better derived directly from + :py:class:`Task`. + """ + raise NotImplementedError - def go(self) -> Iterator[TaskOutcome['Self']]: + def go(self) -> Iterator['Self']: + """ + Perform the task. + + Called by :py:class:`Queue` machinery to accomplish the task. It expects + the child class would implement :py:meth:`run`, with ``go`` taking care + of task/queue interaction. + + :yields: since the task is not expected to run on multiple guests, + only a single instance of the class is yielded to describe the task + and its outcome. + """ + try: - self.run(self.logger) + self.result = self.run(self.logger) except Exception as exc: - # logger.info('finished', color='cyan') + self.result = None + self.exc = exc - yield TaskOutcome( - task=self, - logger=self.logger, - guest=None, - exc=exc) + yield self else: - # logger.info('finished', color='cyan') + self.exc = None - yield TaskOutcome( - task=self, - logger=self.logger, - guest=None, - exc=None) + yield self @dataclasses.dataclass -class Task(_Task): - """ A task that should run on multiple guests at the same time """ - - def run_on_guest(self, guest: 'Guest', logger: Logger) -> None: - raise NotImplementedError +class MultiGuestTask(Task[TaskResultT]): + """ + A task assigned to a particular set of guests. - def prepare_loggers( - self, - logger: Logger) -> dict[str, Logger]: - """ - Create loggers for a set of guests. + An extension of the :py:class:`Task` class, provides a quite generic wrapper + for the actual task which takes care of catching exceptions and proper + reporting. + """ - Guests are assumed to be a group a phase would be executed on, and - therefore their labels need to be set, to provide context, plus their - labels need to be properly aligned for more readable output. - """ + guests: list['Guest'] - loggers: dict[str, Logger] = {} + @tmt.utils.cached_property + def guest_ids(self) -> list[str]: + return sorted([guest.multihost_name for guest in self.guests]) - # First, spawn all loggers, and set their labels if needed. Don't bother - # with labels if there's just a single guest. - for guest in self.guests: - new_logger = logger.clone() + def run_on_guest(self, guest: 'Guest', logger: Logger) -> None: + """ + Perform the task. - if len(self.guests) > 1: - new_logger.labels.append(guest.multihost_name) + Called from :py:meth:`go` once for every guest to run on. Subclasses of + :py:class:`GuestlessTask` should implement their logic in this method + rather than in :py:meth:`go` which is already provided. If your task + requires different handling in :py:class:`go`, it might be better + derived directly from :py:class:`Task`. + """ - loggers[guest.name] = new_logger + raise NotImplementedError - # Second, find the longest labels, and instruct all loggers to pad their - # labels to match this length. This should create well-indented messages. - max_label_span = max(new_logger.labels_span for new_logger in loggers.values()) + def go(self) -> Iterator['Self']: + """ + Perform the task. - for new_logger in loggers.values(): - new_logger.labels_padding = max_label_span + Called by :py:class:`Queue` machinery to accomplish the task. It expects + the child class would implement :py:meth:`run`, with ``go`` taking care + of task/queue interaction. - return loggers + :yields: instances of the same class, describing invocations of the + task and their outcome. For each guest, one instance would be + yielded. + """ - def go(self) -> Iterator[TaskOutcome['Self']]: multiple_guests = len(self.guests) > 1 - new_loggers = self.prepare_loggers(self.logger) + new_loggers = prepare_loggers(self.logger, [guest.multihost_name for guest in self.guests]) old_loggers: dict[str, Logger] = {} with ThreadPoolExecutor(max_workers=len(self.guests)) as executor: @@ -171,8 +215,8 @@ def go(self) -> Iterator[TaskOutcome['Self']]: # well, then the phase would pass the given logger to guest # methods when it calls them, propagating the single logger we # prepared... - old_loggers[guest.name] = guest._logger - new_logger = new_loggers[guest.name] + old_loggers[guest.multihost_name] = guest._logger + new_logger = new_loggers[guest.multihost_name] guest.inject_logger(new_logger) @@ -191,8 +235,8 @@ def go(self) -> Iterator[TaskOutcome['Self']]: for future in as_completed(futures): guest = futures[future] - old_logger = old_loggers[guest.name] - new_logger = new_loggers[guest.name] + old_logger = old_loggers[guest.multihost_name] + new_logger = new_loggers[guest.multihost_name] if multiple_guests: new_logger.info('finished', color='cyan') @@ -202,28 +246,27 @@ def go(self) -> Iterator[TaskOutcome['Self']]: # returned - which is `None` in our case, therefore we can # ignore the return value. try: - future.result() + result = future.result() + + except SystemExit as exc: + task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc) except Exception as exc: - yield TaskOutcome( - task=self, - logger=new_logger, - guest=guest, - exc=exc) + task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None) else: - yield TaskOutcome( - task=self, - logger=new_logger, - guest=guest, - exc=None) + task = dataclasses.replace(self, result=result, exc=None, requested_exit=None) + + task.guest = guest + + yield task # Don't forget to restore the original logger. guest.inject_logger(old_logger) class Queue(list[TaskT]): - """ Queue class for running phases on guests """ + """ Queue class for running tasks """ def __init__(self, name: str, logger: Logger) -> None: super().__init__() @@ -238,15 +281,15 @@ def enqueue_task(self, task: TaskT) -> None: self._logger.info( f'queued {self.name} task #{len(self)}', - f'{task.name} on {fmf.utils.listed(task.guest_ids)}', + task.name, color='cyan') - def run(self) -> Iterator[TaskOutcome[TaskT]]: + def run(self) -> Iterator[TaskT]: """ - Start crunching the queued phases. + Start crunching the queued tasks. - Queued tasks are executed in the order, for each task/guest - combination a :py:class:`TaskOutcome` instance is yielded. + Tasks are executed in the order, for each task/guest + combination a :py:class:`Task` instance is yielded. """ for i, task in enumerate(self): @@ -254,17 +297,17 @@ def run(self) -> Iterator[TaskOutcome[TaskT]]: self._logger.info( f'{self.name} task #{i + 1}', - f'{task.name} on {fmf.utils.listed(task.guest_ids)}', + task.name, color='cyan') - failed_outcomes: list[TaskOutcome[TaskT]] = [] + failed_tasks: list[TaskT] = [] for outcome in task.go(): if outcome.exc: - failed_outcomes.append(outcome) + failed_tasks.append(outcome) yield outcome # TODO: make this optional - if failed_outcomes: + if failed_tasks: return diff --git a/tmt/steps/__init__.py b/tmt/steps/__init__.py index ea1d722da2..26b077b3f6 100644 --- a/tmt/steps/__init__.py +++ b/tmt/steps/__init__.py @@ -23,15 +23,16 @@ ) import click +import fmf.utils from click import echo from click.core import ParameterSource import tmt.export import tmt.log import tmt.options +import tmt.queue import tmt.utils from tmt.options import option, show_step_method_hints -from tmt.queue import GuestlessTask, Queue, Task, TaskOutcome from tmt.utils import ( DEFAULT_NAME, EnvironmentType, @@ -49,7 +50,6 @@ ) if TYPE_CHECKING: - from typing_extensions import Self import tmt.base import tmt.cli @@ -1953,14 +1953,24 @@ def push( @dataclasses.dataclass -class QueuedPhase(GuestlessTask, Task, Generic[StepDataT]): - """ A phase to run on one or more guests """ +class ActionTask(tmt.queue.GuestlessTask[None]): + """ A task to run an action """ - phase: Union[Action, Plugin[StepDataT]] + phase: Action + + @property + def name(self) -> str: + return self.phase.name + + def run(self, logger: tmt.log.Logger) -> None: + self.phase.go() - # A cached environment, it will be initialized by `prepare_environment()` - # on the first call. - _environment: Optional[EnvironmentType] = None + +@dataclasses.dataclass +class PluginTask(tmt.queue.MultiGuestTask[None], Generic[StepDataT]): + """ A task to run a phase on a given set of guests """ + + phase: Plugin[StepDataT] @property def phase_name(self) -> str: @@ -1976,78 +1986,70 @@ def phase_name(self) -> str: @property def name(self) -> str: - return self.phase_name - - def run(self, logger: tmt.log.Logger) -> None: - assert isinstance(self.phase, Action) # narrow type - - self.phase.go() + return f'{self.phase_name} ' \ + f'on {fmf.utils.listed(self.guest_ids)}' def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None: - assert isinstance(self.phase, Plugin) # narrow type + self.phase.go(guest=guest, logger=logger) - self.phase.go( - guest=guest, - logger=logger) - def go(self) -> Iterator[TaskOutcome['Self']]: - # Based on the phase, pick the proper parent class' go() - if isinstance(self.phase, Action): - yield from GuestlessTask.go(self) - - else: - yield from Task.go(self) - - -class PhaseQueue(Queue[QueuedPhase[StepDataT]]): +class PhaseQueue(tmt.queue.Queue[Union[ActionTask, PluginTask[StepDataT]]]): """ Queue class for running phases on guests """ - def enqueue( + def enqueue_action( self, *, - phase: Union[Action, Plugin[StepDataT]], - guests: list['Guest']) -> None: - """ - Add a phase to queue. - - Phase will be executed on given guests, starting at the same time. - - :param phase: phase to run. - :param guests: one or more guests to run the phase on. - """ + phase: Action) -> None: + self.enqueue_task(ActionTask( + logger=phase._logger, + result=None, + guest=None, + exc=None, + requested_exit=None, + phase=phase + )) + def enqueue_plugin( + self, + *, + phase: Plugin[StepDataT], + guests: list['Guest']) -> None: if not guests: raise tmt.utils.MetadataError( f'No guests queued for phase "{phase}". A typo in "where" key?') - self.enqueue_task(QueuedPhase( - phase=phase, + self.enqueue_task(PluginTask( + logger=phase._logger, + result=None, + guest=None, + exc=None, + requested_exit=None, guests=guests, - logger=phase._logger + phase=phase )) @dataclasses.dataclass -class PushTask(Task): +class PushTask(tmt.queue.MultiGuestTask[None]): """ Task performing a workdir push to a guest """ @property def name(self) -> str: - return 'push' + return f'push to {fmf.utils.listed(self.guest_ids)}' def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None: guest.push() @dataclasses.dataclass -class PullTask(Task): +class PullTask(tmt.queue.MultiGuestTask[None]): """ Task performing a workdir pull from a guest """ - source: Optional[Path] = None + source: Optional[Path] @property def name(self) -> str: - return 'pull' + return f'pull from {fmf.utils.listed(self.guest_ids)}' def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None: guest.pull(source=self.source) @@ -2075,13 +2077,13 @@ def sync_with_guests( :param logger: logger to use for logging. """ - queue: Queue[GuestSyncTaskT] = Queue( + queue: tmt.queue.Queue[GuestSyncTaskT] = tmt.queue.Queue( action, logger.descend(logger_name=action)) queue.enqueue_task(task) - failed_actions: list[TaskOutcome[GuestSyncTaskT]] = [] + failed_actions: list[GuestSyncTaskT] = [] for outcome in queue.run(): if outcome.exc: diff --git a/tmt/steps/execute/__init__.py b/tmt/steps/execute/__init__.py index 64ee27147e..9822998a7c 100644 --- a/tmt/steps/execute/__init__.py +++ b/tmt/steps/execute/__init__.py @@ -5,7 +5,7 @@ import os from contextlib import suppress from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast +from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast import click import fmf @@ -18,9 +18,8 @@ from tmt.checks import CheckEvent from tmt.options import option from tmt.plugins import PluginRegistry -from tmt.queue import TaskOutcome from tmt.result import CheckResult, Result, ResultGuestData, ResultOutcome -from tmt.steps import Action, PhaseQueue, QueuedPhase, Step +from tmt.steps import Action, ActionTask, PhaseQueue, PluginTask, Step from tmt.steps.discover import Discover, DiscoverPlugin, DiscoverStepData from tmt.steps.provision import Guest from tmt.utils import Path, ShellScript, Stopwatch, field @@ -734,13 +733,7 @@ def go(self, force: bool = False) -> None: for phase in self.phases(classes=(Action, ExecutePlugin)): if isinstance(phase, Action): - queue.enqueue( - phase=phase, - guests=[ - guest - for guest in self.plan.provision.guests() - if phase.enabled_on_guest(guest) - ]) + queue.enqueue_action(phase=phase) else: # A single execute plugin is expected to process (potentialy) @@ -754,7 +747,7 @@ def go(self, force: bool = False) -> None: phase_copy = cast(ExecutePlugin[ExecuteStepData], copy.copy(phase)) phase_copy.discover_phase = discover.name - queue.enqueue( + queue.enqueue_plugin( phase=phase_copy, guests=[ guest @@ -762,13 +755,13 @@ def go(self, force: bool = False) -> None: if discover.enabled_on_guest(guest) ]) - failed_phases: list[TaskOutcome[QueuedPhase[ExecuteStepData]]] = [] + failed_tasks: list[Union[ActionTask, PluginTask[ExecuteStepData]]] = [] - for phase_outcome in queue.run(): - if phase_outcome.exc: - phase_outcome.logger.fail(str(phase_outcome.exc)) + for outcome in queue.run(): + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) - failed_phases.append(phase_outcome) + failed_tasks.append(outcome) continue # Execute plugins do not return results. Instead, plugin collects results @@ -783,11 +776,11 @@ def go(self, force: bool = False) -> None: # access all collected `_results`. self._results += execute_phases[0].results() - if failed_phases: + if failed_tasks: # TODO: needs a better message... raise tmt.utils.GeneralError( 'execute step failed', - causes=[outcome.exc for outcome in failed_phases if outcome.exc is not None] + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None] ) # To separate "execute" from the follow-up logging visually diff --git a/tmt/steps/finish/__init__.py b/tmt/steps/finish/__init__.py index 4fd02881d0..54ab14dc0f 100644 --- a/tmt/steps/finish/__init__.py +++ b/tmt/steps/finish/__init__.py @@ -1,6 +1,6 @@ import copy import dataclasses -from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast +from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast import click import fmf @@ -11,11 +11,11 @@ from tmt.plugins import PluginRegistry from tmt.steps import ( Action, + ActionTask, Method, PhaseQueue, + PluginTask, PullTask, - QueuedPhase, - TaskOutcome, sync_with_guests, ) from tmt.steps.provision import Guest @@ -141,27 +141,31 @@ def go(self, force: bool = False) -> None: self._logger.descend(logger_name=f'{self}.queue')) for phase in self.phases(classes=(Action, FinishPlugin)): - queue.enqueue( - phase=phase, # type: ignore[arg-type] - guests=[guest for guest in guest_copies if phase.enabled_on_guest(guest)] - ) + if isinstance(phase, Action): + queue.enqueue_action(phase=phase) + + else: + queue.enqueue_plugin( + phase=phase, # type: ignore[arg-type] + guests=[guest for guest in guest_copies if phase.enabled_on_guest(guest)] + ) - failed_phases: list[TaskOutcome[QueuedPhase[FinishStepData]]] = [] + failed_tasks: list[Union[ActionTask, PluginTask[FinishStepData]]] = [] - for phase_outcome in queue.run(): - if not isinstance(phase_outcome.task.phase, FinishPlugin): + for outcome in queue.run(): + if not isinstance(outcome.phase, FinishPlugin): continue - if phase_outcome.exc: - phase_outcome.logger.fail(str(phase_outcome.exc)) + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) - failed_phases.append(phase_outcome) + failed_tasks.append(outcome) continue - if failed_phases: + if failed_tasks: raise tmt.utils.GeneralError( 'finish step failed', - causes=[outcome.exc for outcome in failed_phases if outcome.exc is not None] + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None] ) # To separate "finish" from "pull" queue visually @@ -174,9 +178,14 @@ def go(self, force: bool = False) -> None: self, 'pull', PullTask( - guests=guest_copies, logger=self._logger, - source=self.plan.data_directory), + result=None, + guest=None, + exc=None, + requested_exit=None, + guests=guest_copies, + source=self.plan.data_directory + ), self._logger) # To separate "finish" from "pull" queue visually diff --git a/tmt/steps/prepare/__init__.py b/tmt/steps/prepare/__init__.py index 58d0a78ea6..89fea377ef 100644 --- a/tmt/steps/prepare/__init__.py +++ b/tmt/steps/prepare/__init__.py @@ -1,7 +1,14 @@ import collections import copy import dataclasses -from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast +from typing import ( + TYPE_CHECKING, + Any, + Optional, + TypeVar, + Union, + cast, + ) import click import fmf @@ -14,13 +21,13 @@ import tmt.utils from tmt.options import option from tmt.plugins import PluginRegistry -from tmt.queue import TaskOutcome from tmt.steps import ( Action, + ActionTask, PhaseQueue, + PluginTask, PullTask, PushTask, - QueuedPhase, sync_with_guests, ) from tmt.steps.provision import Guest @@ -245,7 +252,14 @@ def go(self, force: bool = False) -> None: sync_with_guests( self, 'push', - PushTask(guests=guest_copies, logger=self._logger), + PushTask( + logger=self._logger, + result=None, + guest=None, + exc=None, + requested_exit=None, + guests=guest_copies + ), self._logger) # To separate "push" from "prepare" queue visually @@ -256,30 +270,34 @@ def go(self, force: bool = False) -> None: self._logger.descend(logger_name=f'{self}.queue')) for phase in self.phases(classes=(Action, PreparePlugin)): - queue.enqueue( - phase=phase, # type: ignore[arg-type] - guests=[guest for guest in guest_copies if phase.enabled_on_guest(guest)] - ) + if isinstance(phase, Action): + queue.enqueue_action(phase=phase) - failed_phases: list[TaskOutcome[QueuedPhase[PrepareStepData]]] = [] + else: + queue.enqueue_plugin( + phase=phase, # type: ignore[arg-type] + guests=[guest for guest in guest_copies if phase.enabled_on_guest(guest)] + ) - for phase_outcome in queue.run(): - if not isinstance(phase_outcome.task.phase, PreparePlugin): + failed_tasks: list[Union[ActionTask, PluginTask[PrepareStepData]]] = [] + + for outcome in queue.run(): + if not isinstance(outcome.phase, PreparePlugin): continue - if phase_outcome.exc: - phase_outcome.logger.fail(str(phase_outcome.exc)) + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) - failed_phases.append(phase_outcome) + failed_tasks.append(outcome) continue self.preparations_applied += 1 - if failed_phases: + if failed_tasks: # TODO: needs a better message... raise tmt.utils.GeneralError( 'prepare step failed', - causes=[outcome.exc for outcome in failed_phases if outcome.exc is not None] + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None] ) self.info('') @@ -291,8 +309,12 @@ def go(self, force: bool = False) -> None: self, 'pull', PullTask( - guests=guest_copies, logger=self._logger, + result=None, + guest=None, + exc=None, + requested_exit=None, + guests=guest_copies, source=self.plan.data_directory), self._logger) diff --git a/tmt/steps/provision/__init__.py b/tmt/steps/provision/__init__.py index cf8df64bf4..8e8139e985 100644 --- a/tmt/steps/provision/__init__.py +++ b/tmt/steps/provision/__init__.py @@ -11,6 +11,7 @@ import subprocess import tempfile from collections.abc import Iterator +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from shlex import quote from typing import ( TYPE_CHECKING, @@ -25,17 +26,20 @@ import click import fmf +import fmf.utils from click import echo import tmt import tmt.hardware import tmt.log import tmt.plugins +import tmt.queue import tmt.steps import tmt.utils +from tmt.log import Logger from tmt.options import option from tmt.plugins import PluginRegistry -from tmt.steps import Action +from tmt.steps import Action, ActionTask, PhaseQueue from tmt.utils import ( Command, Path, @@ -1703,6 +1707,11 @@ class ProvisionPlugin(tmt.steps.GuestlessPlugin[ProvisionStepDataT]): _data_class = ProvisionStepData # type: ignore[assignment] _guest_class = Guest + #: If set, the plugin can be asked to provision in multiple threads at the + #: same time. Plugins that do not support parallel provisioning should keep + #: this set to ``False``. + _thread_safe: bool = False + # Default implementation for provision is a virtual machine how = 'virtual' @@ -1819,6 +1828,118 @@ def show(self, keys: Optional[list[str]] = None) -> None: echo(tmt.utils.format('hardware', tmt.utils.dict_to_yaml(hardware.to_spec()))) +@dataclasses.dataclass +class ProvisionTask(tmt.queue.GuestlessTask[None]): + """ A task to run provisioning of multiple guests """ + + #: Phases describing guests to provision. In the ``provision`` step, + #: each phase describes one guest. + phases: list[ProvisionPlugin[ProvisionStepData]] + + #: When ``ProvisionTask`` instance is received from the queue, ``phase`` + #: points to the phase that has been provisioned by the task. + phase: Optional[ProvisionPlugin[ProvisionStepData]] = None + + @property + def name(self) -> str: + return cast(str, fmf.utils.listed([phase.name for phase in self.phases])) + + def go(self) -> Iterator['ProvisionTask']: + multiple_guests = len(self.phases) > 1 + + new_loggers = tmt.queue.prepare_loggers( + self.logger, + [phase.name for phase in self.phases]) + old_loggers: dict[str, Logger] = {} + + with ThreadPoolExecutor(max_workers=len(self.phases)) as executor: + futures: dict[Future[None], ProvisionPlugin[ProvisionStepData]] = {} + + for phase in self.phases: + old_loggers[phase.name] = phase._logger + new_logger = new_loggers[phase.name] + + phase.inject_logger(new_logger) + + if multiple_guests: + new_logger.info('started', color='cyan') + + # Submit each phase as a distinct job for executor pool... + futures[ + executor.submit(phase.go) + ] = phase + + # ... and then sit and wait as they get delivered to us as they + # finish. + for future in as_completed(futures): + phase = futures[future] + + old_logger = old_loggers[phase.name] + new_logger = new_loggers[phase.name] + + if multiple_guests: + new_logger.info('finished', color='cyan') + + # `Future.result()` will either 1. reraise an exception the + # callable raised, if any, or 2. return whatever the callable + # returned - which is `None` in our case, therefore we can + # ignore the return value. + try: + future.result() + + except SystemExit as exc: + yield ProvisionTask( + logger=new_logger, + result=None, + guest=None, + exc=None, + requested_exit=exc, + phases=[] + ) + + except Exception as exc: + yield ProvisionTask( + logger=new_logger, + result=None, + guest=None, + exc=exc, + requested_exit=None, + phases=[] + ) + + else: + yield ProvisionTask( + logger=new_logger, + result=None, + guest=phase.guest(), + exc=None, + requested_exit=None, + phases=[], + phase=phase + ) + + # Don't forget to restore the original logger. + phase.inject_logger(old_logger) + + +class ProvisionQueue(tmt.queue.Queue[ProvisionTask]): + """ Queue class for running provisioning tasks """ + + def enqueue( + self, + *, + phases: list[ProvisionPlugin[ProvisionStepData]], + logger: Logger) -> None: + self.enqueue_task(ProvisionTask( + logger=logger, + result=None, + guest=None, + exc=None, + requested_exit=None, + phases=phases + )) + + class Provision(tmt.steps.Step): """ Provision an environment for testing or use localhost. """ @@ -1841,7 +1962,10 @@ def __init__( # List of provisioned guests and loaded guest data self._guests: list[Guest] = [] self._guest_data: dict[str, GuestData] = {} - self.is_multihost = False + + @property + def is_multihost(self) -> bool: + return len(self.data) > 1 def load(self) -> None: """ Load guest data from the workdir """ @@ -1918,45 +2042,165 @@ def go(self, force: bool = False) -> None: # Provision guests self._guests = [] - save = True - self.is_multihost = sum(isinstance(phase, ProvisionPlugin) for phase in self.phases()) > 1 - try: - for phase in self.phases(classes=(Action, ProvisionPlugin)): - try: - if isinstance(phase, Action): - phase.go() - elif isinstance(phase, ProvisionPlugin): - phase.go() + def _run_provision_phases( + phases: list[ProvisionPlugin[ProvisionStepData]] + ) -> tuple[list[ProvisionTask], list[ProvisionTask]]: + """ + Run the given set of ``provision`` phases. - guest = phase.guest() - if guest: - guest.show() + :param phases: list of ``provision`` step phases. By "running" them, + they would provision their respective guests. + :returns: two lists, a list of all :py:class:`ProvisionTask` + instances queued, and a subset of the first list collecting only + those tasks that failed. + """ - if self.is_multihost: - self.info('') - except (tmt.utils.RunError, tmt.utils.ProvisionError) as error: - self.fail(str(error)) - raise - finally: - if isinstance(phase, ProvisionPlugin): - guest = phase.guest() - if guest and (guest.is_ready or self.is_dry_run): - self._guests.append(guest) + queue: ProvisionQueue = ProvisionQueue( + 'provision.provision', + self._logger.descend(logger_name=f'{self}.queue')) - # Give a summary, update status and save - self.summary() - self.status('done') - except (SystemExit, tmt.utils.SpecificationError) as error: - # A plugin will only raise SystemExit if the exit is really desired - # and no other actions should be done. An example of this is - # listing available images. In such case, the workdir is deleted - # as it's redundant and save() would throw an error. - save = False - raise error - finally: - if save: - self.save() + queue.enqueue(phases=phases, logger=queue._logger) + + all_tasks: list[ProvisionTask] = [] + failed_tasks: list[ProvisionTask] = [] + + for outcome in queue.run(): + all_tasks.append(outcome) + + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) + + failed_tasks.append(outcome) + + continue + + guest = outcome.guest + + if guest: + guest.show() + + if guest.is_ready or self.is_dry_run: + self._guests.append(guest) + + return all_tasks, failed_tasks + + def _run_action_phases(phases: list[Action]) -> tuple[list[ActionTask], list[ActionTask]]: + """ + Run the given set of actions. + + :param phases: list of actions, e.g. ``login`` or ``reboot``, given + in the ``provision`` step. + :returns: two lists, a list of all :py:class:`ActionTask` instances + queued, and a subset of the first list collecting only those + tasks that failed. + """ + + queue: PhaseQueue[ProvisionStepData] = PhaseQueue( + 'provision.action', + self._logger.descend(logger_name=f'{self}.queue')) + + for action in phases: + queue.enqueue_action(phase=action) + + all_tasks: list[ActionTask] = [] + failed_tasks: list[ActionTask] = [] + + for outcome in queue.run(): + assert isinstance(outcome, ActionTask) + + all_tasks.append(outcome) + + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) + + failed_tasks.append(outcome) + + return all_tasks, failed_tasks + + # Provisioning phases may be intermixed with actions. To perform all + # phases and actions in a consistent manner, we will process them in + # the order or their `order` key. We will group provisioning phases + # not interrupted by action into batches, and run the sequence of + # provisioning phases in parallel. + all_phases = self.phases(classes=(Action, ProvisionPlugin)) + all_phases.sort(key=lambda x: x.order) + + all_outcomes: list[Union[ActionTask, ProvisionTask]] = [] + failed_outcomes: list[Union[ActionTask, ProvisionTask]] = [] + + while all_phases: + # Start looking for sequences of phases of the same kind. Collect + # as many as possible, until hitting a different one + phase = all_phases.pop(0) + + if isinstance(phase, Action): + action_phases: list[Action] = [phase] + + while all_phases and isinstance(all_phases[0], Action): + action_phases.append(cast(Action, all_phases.pop(0))) + + all_action_outcomes, failed_action_outcomes = _run_action_phases(action_phases) + + all_outcomes += all_action_outcomes + failed_outcomes += failed_action_outcomes + + else: + plugin_phases: list[ProvisionPlugin[ProvisionStepData]] = [ + phase] # type: ignore[list-item] + + # ignore[attr-defined]: mypy does not recognize `phase` as `ProvisionPlugin`. + if phase._thread_safe: # type: ignore[attr-defined] + while all_phases: + if not isinstance(all_phases[0], ProvisionPlugin): + break + + if not all_phases[0]._thread_safe: + break + + plugin_phases.append( + cast( + ProvisionPlugin[ProvisionStepData], + all_phases.pop(0))) + + all_plugin_outcomes, failed_plugin_outcomes = _run_provision_phases( + plugin_phases) + + all_outcomes += all_plugin_outcomes + failed_outcomes += failed_plugin_outcomes + + # A plugin will only raise SystemExit if the exit is really desired + # and no other actions should be done. An example of this is + # listing available images. In such case, the workdir is deleted + # as it's redundant and save() would throw an error. + # + # TODO: in theory, there may be many, many plugins raising `SystemExit` + # but we can re-raise just a single one. It would be better to not use + # an exception to signal this, but rather set/return a special object, + # leaving the materialization into `SystemExit` to the step and/or tmt. + # Or not do any one-shot actions under the disguise of provisioning... + exiting_tasks = [ + outcome for outcome in all_outcomes if outcome.requested_exit is not None + ] + + if exiting_tasks: + assert exiting_tasks[0].requested_exit is not None + + raise exiting_tasks[0].requested_exit + + if failed_outcomes: + raise tmt.utils.GeneralError( + 'provision step failed', + causes=[outcome.exc for outcome in failed_outcomes if outcome.exc is not None] + ) + + # To separate "provision" from the follow-up logging visually + self.info('') + + # Give a summary, update status and save + self.summary() + self.status('done') + self.save() def guests(self) -> list[Guest]: """ Return the list of all provisioned guests """ diff --git a/tmt/steps/provision/artemis.py b/tmt/steps/provision/artemis.py index a819853bac..3c35372dbd 100644 --- a/tmt/steps/provision/artemis.py +++ b/tmt/steps/provision/artemis.py @@ -690,6 +690,8 @@ class ProvisionArtemis(tmt.steps.provision.ProvisionPlugin[ProvisionArtemisData] _data_class = ProvisionArtemisData _guest_class = GuestArtemis + _thread_safe = True + # Guest instance _guest = None diff --git a/tmt/steps/provision/connect.py b/tmt/steps/provision/connect.py index c968f62b3a..c9c80c90e0 100644 --- a/tmt/steps/provision/connect.py +++ b/tmt/steps/provision/connect.py @@ -160,6 +160,8 @@ class ProvisionConnect(tmt.steps.provision.ProvisionPlugin[ProvisionConnectData] _data_class = ProvisionConnectData _guest_class = GuestConnect + _thread_safe = True + # Guest instance _guest = None diff --git a/tmt/steps/provision/local.py b/tmt/steps/provision/local.py index ae59f6f25d..4f0bfb0c36 100644 --- a/tmt/steps/provision/local.py +++ b/tmt/steps/provision/local.py @@ -154,6 +154,8 @@ class ProvisionLocal(tmt.steps.provision.ProvisionPlugin[ProvisionLocalData]): _data_class = ProvisionLocalData _guest_class = GuestLocal + _thread_safe = True + # Guest instance _guest = None diff --git a/tmt/steps/provision/mrack.py b/tmt/steps/provision/mrack.py index fea91cd01f..ab81b491e8 100644 --- a/tmt/steps/provision/mrack.py +++ b/tmt/steps/provision/mrack.py @@ -680,6 +680,8 @@ class ProvisionBeaker(tmt.steps.provision.ProvisionPlugin[ProvisionBeakerData]): _data_class = ProvisionBeakerData _guest_class = GuestBeaker + # _thread_safe = True + # Guest instance _guest = None diff --git a/tmt/steps/provision/podman.py b/tmt/steps/provision/podman.py index a4f57266a0..5c08a31849 100644 --- a/tmt/steps/provision/podman.py +++ b/tmt/steps/provision/podman.py @@ -425,6 +425,8 @@ class ProvisionPodman(tmt.steps.provision.ProvisionPlugin[ProvisionPodmanData]): _data_class = ProvisionPodmanData _guest_class = GuestContainer + _thread_safe = True + # Guest instance _guest = None diff --git a/tmt/steps/provision/testcloud.py b/tmt/steps/provision/testcloud.py index 8e787d7215..0b6fd82f08 100644 --- a/tmt/steps/provision/testcloud.py +++ b/tmt/steps/provision/testcloud.py @@ -4,6 +4,7 @@ import os import platform import re +import threading import time import types from typing import TYPE_CHECKING, Any, Optional, Union, cast @@ -336,6 +337,11 @@ class GuestTestcloud(tmt.GuestSsh): _domain: Optional[ # type: ignore[name-defined] 'testcloud.domain_configuration.DomainConfiguration'] = None + #: The lock protects calls into the testcloud library. We suspect it might + #: be unprepared for multi-threaded use. After the dust settles, we may + #: remove the lock. + _testcloud_lock = threading.Lock() + @property def is_ready(self) -> bool: if self._instance is None: @@ -343,14 +349,17 @@ def is_ready(self) -> bool: assert testcloud is not None assert libvirt is not None - try: - state = testcloud.instance._find_domain(self._instance.name, self._instance.connection) - # Note the type of variable 'state' is 'Any'. Hence, we don't use: - # return state == 'running' - # to avoid error from type checking. - return bool(state == "running") - except libvirt.libvirtError: - return False + + with GuestTestcloud._testcloud_lock: + try: + state = testcloud.instance._find_domain( + self._instance.name, self._instance.connection) + # Note the type of variable 'state' is 'Any'. Hence, we don't use: + # return state == 'running' + # to avoid error from type checking. + return bool(state == "running") + except libvirt.libvirtError: + return False def _get_url(self, url: str, message: str) -> requests.Response: """ Get url, retry when fails, return response """ @@ -388,10 +397,11 @@ def _guess_image_url(self, name: str) -> str: url: Optional[str] = None assert testcloud is not None - try: - url = testcloud.util.get_image_url(name.lower().strip(), self.arch) - except Exception as error: - raise ProvisionError("Could not get image url.") from error + with GuestTestcloud._testcloud_lock: + try: + url = testcloud.util.get_image_url(name.lower().strip(), self.arch) + except Exception as error: + raise ProvisionError("Could not get image url.") from error if not url: raise ProvisionError(f"Could not map '{name}' to compose.") @@ -404,12 +414,15 @@ def wake(self) -> None: level=2, shift=0) self.prepare_config() assert testcloud is not None - self._image = testcloud.image.Image(self.image_url) + with GuestTestcloud._testcloud_lock: + self._image = testcloud.image.Image(self.image_url) if self.instance_name is None: raise ProvisionError(f"The instance name '{self.instance_name}' is invalid.") - self._instance = testcloud.instance.Instance( - self.instance_name, image=self._image, - connection=f"qemu:///{self.connection}", desired_arch=self.arch) + + with GuestTestcloud._testcloud_lock: + self._instance = testcloud.instance.Instance( + self.instance_name, image=self._image, + connection=f"qemu:///{self.connection}", desired_arch=self.arch) def prepare_ssh_key(self, key_type: Optional[str] = None) -> None: """ Prepare ssh key for authentication """ @@ -445,7 +458,8 @@ def prepare_config(self) -> None: # Get configuration assert testcloud is not None - self.config = testcloud.config.get_config() + with GuestTestcloud._testcloud_lock: + self.config = testcloud.config.get_config() self.debug(f"testcloud version: {testcloud.__version__}") @@ -601,23 +615,24 @@ def start(self) -> None: # Initialize and prepare testcloud image assert testcloud is not None - self._image = testcloud.image.Image(self.image_url) - self.verbose('qcow', self._image.name, 'green') - if not Path(self._image.local_path).exists(): - self.info('progress', 'downloading...', 'cyan') - try: - self._image.prepare() - except FileNotFoundError as error: - raise ProvisionError( - f"Image '{self._image.local_path}' not found.") from error - except (testcloud.exceptions.TestcloudPermissionsError, - PermissionError) as error: - raise ProvisionError( - f"Failed to prepare the image. Check the '{TESTCLOUD_IMAGES}' " - f"directory permissions.") from error - except KeyError as error: - raise ProvisionError( - f"Failed to prepare image '{self.image_url}'.") from error + with GuestTestcloud._testcloud_lock: + self._image = testcloud.image.Image(self.image_url) + self.verbose('qcow', self._image.name, 'green') + if not Path(self._image.local_path).exists(): + self.info('progress', 'downloading...', 'cyan') + try: + self._image.prepare() + except FileNotFoundError as error: + raise ProvisionError( + f"Image '{self._image.local_path}' not found.") from error + except (testcloud.exceptions.TestcloudPermissionsError, + PermissionError) as error: + raise ProvisionError( + f"Failed to prepare the image. Check the '{TESTCLOUD_IMAGES}' " + f"directory permissions.") from error + except KeyError as error: + raise ProvisionError( + f"Failed to prepare image '{self.image_url}'.") from error # Prepare hostname (get rid of possible unwanted characters) hostname = re.sub(r"[^a-zA-Z0-9\-]+", "-", self.name.lower()).strip("-") @@ -652,88 +667,91 @@ def start(self) -> None: # Is the combination of host-requested architecture kvm capable? kvm = bool(self.arch == platform.machine() and os.path.exists("/dev/kvm")) - # Is this el <= 7? - legacy_os = testcloud.util.needs_legacy_net(self._image.name) - - # Is this a CoreOS? - self._domain.coreos = bool(re.search('coreos|rhcos', self.image.lower())) - - if self.arch == "x86_64": - self._domain.system_architecture = X86_64ArchitectureConfiguration( - kvm=kvm, - uefi=False, # Configurable - model="q35" if not legacy_os else "pc") - elif self.arch == "aarch64": - self._domain.system_architecture = AArch64ArchitectureConfiguration( - kvm=kvm, - uefi=True, # Always enabled - model="virt") - elif self.arch == "ppc64le": - self._domain.system_architecture = Ppc64leArchitectureConfiguration( - kvm=kvm, - uefi=False, # Always disabled - model="pseries") - elif self.arch == "s390x": - self._domain.system_architecture = S390xArchitectureConfiguration( - kvm=kvm, - uefi=False, # Always disabled - model="s390-ccw-virtio") - else: - raise tmt.utils.ProvisionError("Unknown architecture requested.") - - mac_address = testcloud.util.generate_mac_address() - if f"qemu:///{self.connection}" == "qemu:///system": - self._domain.network_configuration = SystemNetworkConfiguration( - mac_address=mac_address) - elif f"qemu:///{self.connection}" == "qemu:///session": - device_type = "virtio-net-pci" if not legacy_os else "e1000" - self._domain.network_configuration = UserNetworkConfiguration( - mac_address=mac_address, - port=testcloud.util.spawn_instance_port_file(self.instance_name), - device_type=device_type) - else: - raise tmt.utils.ProvisionError("Only system, or session connection is supported.") - - self._domain.storage_devices.append(storage_image) - - if not self._domain.coreos: - seed_disk = RawStorageDevice(self._domain.seed_path) - self._domain.storage_devices.append(seed_disk) - - self._instance = testcloud.instance.Instance( - hostname=hostname, - image=self._image, - connection=f"qemu:///{self.connection}", - domain_configuration=self._domain) - self.verbose('name', self.instance_name, 'green') - - # Decide if we want to multiply timeouts when emulating an architecture - time_coeff = NON_KVM_TIMEOUT_COEF if not kvm else 1 - - # Prepare ssh key - # TODO: Maybe... some better way to do this? - if self._domain.coreos: - self._instance.coreos = True - # prepare_ssh_key() writes key directly to COREOS_DATA - self._instance.ssh_path = [] - self.prepare_ssh_key(SSH_KEYGEN_TYPE) - - # Boot the virtual machine - self.info('progress', 'booting...', 'cyan') - assert libvirt is not None - try: - self._instance.prepare() - self._instance.spawn_vm() - self._instance.start(DEFAULT_BOOT_TIMEOUT * time_coeff) - except (testcloud.exceptions.TestcloudInstanceError, - libvirt.libvirtError) as error: - raise ProvisionError( - f'Failed to boot testcloud instance ({error}).') - self.guest = self._instance.get_ip() - self.port = int(self._instance.get_instance_port()) - self.verbose('ip', self.guest, 'green') - self.verbose('port', self.port, 'green') - self._instance.create_ip_file(self.guest) + with GuestTestcloud._testcloud_lock: + # Is this el <= 7? + legacy_os = testcloud.util.needs_legacy_net(self._image.name) + + # Is this a CoreOS? + self._domain.coreos = bool(re.search('coreos|rhcos', self.image.lower())) + + if self.arch == "x86_64": + self._domain.system_architecture = X86_64ArchitectureConfiguration( + kvm=kvm, + uefi=False, # Configurable + model="q35" if not legacy_os else "pc") + elif self.arch == "aarch64": + self._domain.system_architecture = AArch64ArchitectureConfiguration( + kvm=kvm, + uefi=True, # Always enabled + model="virt") + elif self.arch == "ppc64le": + self._domain.system_architecture = Ppc64leArchitectureConfiguration( + kvm=kvm, + uefi=False, # Always disabled + model="pseries") + elif self.arch == "s390x": + self._domain.system_architecture = S390xArchitectureConfiguration( + kvm=kvm, + uefi=False, # Always disabled + model="s390-ccw-virtio") + else: + raise tmt.utils.ProvisionError("Unknown architecture requested.") + + mac_address = testcloud.util.generate_mac_address() + if f"qemu:///{self.connection}" == "qemu:///system": + self._domain.network_configuration = SystemNetworkConfiguration( + mac_address=mac_address) + elif f"qemu:///{self.connection}" == "qemu:///session": + device_type = "virtio-net-pci" if not legacy_os else "e1000" + self._domain.network_configuration = UserNetworkConfiguration( + mac_address=mac_address, + port=testcloud.util.spawn_instance_port_file(self.instance_name), + device_type=device_type) + else: + raise tmt.utils.ProvisionError("Only system, or session connection is supported.") + + self._domain.storage_devices.append(storage_image) + + if not self._domain.coreos: + seed_disk = RawStorageDevice(self._domain.seed_path) + self._domain.storage_devices.append(seed_disk) + + self._instance = testcloud.instance.Instance( + hostname=hostname, + image=self._image, + connection=f"qemu:///{self.connection}", + domain_configuration=self._domain) + + self.verbose('name', self.instance_name, 'green') + + # Decide if we want to multiply timeouts when emulating an architecture + time_coeff = NON_KVM_TIMEOUT_COEF if not kvm else 1 + + # Prepare ssh key + # TODO: Maybe... some better way to do this? + if self._domain.coreos: + self._instance.coreos = True + # prepare_ssh_key() writes key directly to COREOS_DATA + self._instance.ssh_path = [] + self.prepare_ssh_key(SSH_KEYGEN_TYPE) + + # Boot the virtual machine + self.info('progress', 'booting...', 'cyan') + assert libvirt is not None + + try: + self._instance.prepare() + self._instance.spawn_vm() + self._instance.start(DEFAULT_BOOT_TIMEOUT * time_coeff) + except (testcloud.exceptions.TestcloudInstanceError, + libvirt.libvirtError) as error: + raise ProvisionError( + f'Failed to boot testcloud instance ({error}).') + self.guest = self._instance.get_ip() + self.port = int(self._instance.get_instance_port()) + self.verbose('ip', self.guest, 'green') + self.verbose('port', self.port, 'green') + self._instance.create_ip_file(self.guest) # Wait a bit until the box is up if not self.reconnect( @@ -756,22 +774,28 @@ def stop(self) -> None: if self._instance and self.guest: self.debug(f"Stopping testcloud instance '{self.instance_name}'.") assert testcloud is not None - try: - self._instance.stop() - except testcloud.exceptions.TestcloudInstanceError as error: - raise tmt.utils.ProvisionError( - f"Failed to stop testcloud instance: {error}") + + with GuestTestcloud._testcloud_lock: + try: + self._instance.stop() + except testcloud.exceptions.TestcloudInstanceError as error: + raise tmt.utils.ProvisionError( + f"Failed to stop testcloud instance: {error}") + self.info('guest', 'stopped', 'green') def remove(self) -> None: """ Remove the guest (disk cleanup) """ if self._instance: self.debug(f"Removing testcloud instance '{self.instance_name}'.") - try: - self._instance.remove(autostop=True) - except FileNotFoundError as error: - raise tmt.utils.ProvisionError( - f"Failed to remove testcloud instance: {error}") + + with GuestTestcloud._testcloud_lock: + try: + self._instance.remove(autostop=True) + except FileNotFoundError as error: + raise tmt.utils.ProvisionError( + f"Failed to remove testcloud instance: {error}") + self.info('guest', 'removed', 'green') def reboot(self, @@ -786,7 +810,8 @@ def reboot(self, return super().reboot(hard=hard, command=command) if not self._instance: raise tmt.utils.ProvisionError("No instance initialized.") - self._instance.reboot(soft=not hard) + with GuestTestcloud._testcloud_lock: + self._instance.reboot(soft=not hard) return self.reconnect(timeout=timeout) @@ -841,6 +866,8 @@ class ProvisionTestcloud(tmt.steps.provision.ProvisionPlugin[ProvisionTestcloudD _data_class = ProvisionTestcloudData _guest_class = GuestTestcloud + _thread_safe = True + # Guest instance _guest = None