Skip to content

Commit

Permalink
squash: fix stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
happz committed Nov 21, 2023
1 parent 888a537 commit 1ce652a
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tests/login/step.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rlJournalStart
rlAssertGrep "interactive" "output"

if [ "$step" = "provision" ]; then
rlRun "grep '^ $step$' -A6 output | grep -i interactive"
rlRun "grep '^ $step$' -A12 output | grep -i interactive"
elif [ "$step" = "prepare" ]; then
rlRun "grep '^ $step$' -A8 output | grep -i interactive"
elif [ "$step" = "execute" ]; then
Expand Down
8 changes: 4 additions & 4 deletions tests/multihost/complete/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ function check_current_topology () {
}

function check_shared_topology () {
rlAssertEquals "Guest names" "client-1 client-2 server" "$(yq -r '."guest-names" | join(" ")' $1)"
rlAssertEquals "Role names" "client server" "$(yq -r '."role-names" | join(" ")' $1)"
rlAssertEquals "Client role guests" "client-1 client-2" "$(yq -r '.roles.client | join(" ")' $1)"
rlAssertEquals "Server role guests" "server" "$(yq -r '.roles.server | join(" ")' $1)"
rlAssertEquals "Guest names" "client-1 client-2 server" "$(yq -r '."guest-names" | sort | join(" ")' $1)"
rlAssertEquals "Role names" "client server" "$(yq -r '."role-names" | sort | join(" ")' $1)"
rlAssertEquals "Client role guests" "client-1 client-2" "$(yq -r '.roles.client | sort | join(" ")' $1)"
rlAssertEquals "Server role guests" "server" "$(yq -r '.roles.server | sort | join(" ")' $1)"

rlAssertEquals "Guest client-1 name" "client-1" "$(yq -r '.guests["client-1"].name' $1)"
rlAssertEquals "Guest client-1 role" "client" "$(yq -r '.guests["client-1"].role' $1)"
Expand Down
27 changes: 18 additions & 9 deletions tmt/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING, Generic, Optional, TypeVar

import tmt.utils
from tmt.log import Logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -82,6 +83,8 @@ def prepare_loggers(logger: Logger, labels: list[str]) -> dict[str, Logger]:

@dataclasses.dataclass
class GuestlessTask(Task[TaskResultT]):
""" A task not assigned to a particular set of guests """

def run(self, logger: Logger) -> TaskResultT:
raise NotImplementedError

Expand All @@ -103,15 +106,21 @@ def go(self) -> Iterator['Self']:

@dataclasses.dataclass
class MultiGuestTask(Task[TaskResultT]):
""" A task assigned to a particular set of guests """

guests: list['Guest']

@tmt.utils.cached_property
def guest_ids(self) -> List[str]:
return sorted([guest.multihost_name for guest in self.guests])

def run_on_guest(self, guest: 'Guest', logger: Logger) -> None:
raise NotImplementedError

def go(self) -> Iterator['Self']:
multiple_guests = len(self.guests) > 1

new_loggers = prepare_loggers(self.logger, [guest.name for guest in self.guests])
new_loggers = prepare_loggers(self.logger, [guest.multihost_name for guest in self.guests])
old_loggers: dict[str, Logger] = {}

with ThreadPoolExecutor(max_workers=len(self.guests)) as executor:
Expand All @@ -129,8 +138,8 @@ def go(self) -> Iterator['Self']:
# well, then the phase would pass the given logger to guest
# methods when it calls them, propagating the single logger we
# prepared...
old_loggers[guest.name] = guest._logger
new_logger = new_loggers[guest.name]
old_loggers[guest.multihost_name] = guest._logger
new_logger = new_loggers[guest.multihost_name]

guest.inject_logger(new_logger)

Expand All @@ -149,8 +158,8 @@ def go(self) -> Iterator['Self']:
for future in as_completed(futures):
guest = futures[future]

old_logger = old_loggers[guest.name]
new_logger = new_loggers[guest.name]
old_logger = old_loggers[guest.multihost_name]
new_logger = new_loggers[guest.multihost_name]

if multiple_guests:
new_logger.info('finished', color='cyan')
Expand All @@ -177,7 +186,7 @@ def go(self) -> Iterator['Self']:


class Queue(list[TaskT]):
""" Queue class for running phases on guests """
""" Queue class for running tasks """

def __init__(self, name: str, logger: Logger) -> None:
super().__init__()
Expand All @@ -197,10 +206,10 @@ def enqueue_task(self, task: TaskT) -> None:

def run(self) -> Iterator[TaskT]:
"""
Start crunching the queued phases.
Start crunching the queued tasks.
Queued tasks are executed in the order, for each task/guest
combination a :py:class:`TaskOutcome` instance is yielded.
Tasks are executed in the order, for each task/guest
combination a :py:class:`Task` instance is yielded.
"""

for i, task in enumerate(self):
Expand Down
12 changes: 8 additions & 4 deletions tmt/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,8 @@ def push(

@dataclasses.dataclass
class ActionTask(tmt.queue.GuestlessTask[None]):
""" A task to run an action """

phase: Action

@property
Expand All @@ -1951,6 +1953,8 @@ def run(self, logger: tmt.log.Logger) -> None:

@dataclasses.dataclass
class PluginTask(tmt.queue.MultiGuestTask[None], Generic[StepDataT]):
""" A task to run a phase on a given set of guests """

phase: Plugin[StepDataT]

@property
Expand All @@ -1967,8 +1971,8 @@ def phase_name(self) -> str:

@property
def name(self) -> str:
return f'{self.phase.name} ' \
f'on {fmf.utils.listed([guest.multihost_name for guest in self.guests])}'
return f'{self.phase_name} ' \
f'on {fmf.utils.listed(self.guest_ids)}'

def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None:
self.phase.go(guest=guest, logger=logger)
Expand Down Expand Up @@ -2014,7 +2018,7 @@ class PushTask(tmt.queue.MultiGuestTask[None]):

@ property
def name(self) -> str:
return f'push to {fmf.utils.listed([guest.multihost_name for guest in self.guests])}'
return f'push to {fmf.utils.listed(self.guest_ids)}'

def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None:
guest.push()
Expand All @@ -2028,7 +2032,7 @@ class PullTask(tmt.queue.MultiGuestTask[None]):

@ property
def name(self) -> str:
return f'pull from {fmf.utils.listed([guest.multihost_name for guest in self.guests])}'
return f'pull from {fmf.utils.listed(self.guest_ids)}'

def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None:
guest.pull(source=self.source)
Expand Down
23 changes: 21 additions & 2 deletions tmt/steps/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,14 @@ def show(self, keys: Optional[list[str]] = None) -> None:

@dataclasses.dataclass
class ProvisionTask(tmt.queue.GuestlessTask[None]):
""" A task to run provisioning of multiple guest """

#: Phases describing guests to provision. In the ``provision`` step,
#: each phase describes one guest.
phases: list[ProvisionPlugin[ProvisionStepData]]

#: When ``ProvisionTask`` instance is received from the queue, ``phase``
#: points to the phase that has been provisioned for by the task.
phase: Optional[ProvisionPlugin[ProvisionStepData]] = None

@property
Expand Down Expand Up @@ -1862,6 +1869,8 @@ def go(self) -> Iterator['ProvisionTask']:


class ProvisionQueue(tmt.queue.Queue[ProvisionTask]):
""" Queue class for running provisioning tasks """

def enqueue(
self,
*,
Expand Down Expand Up @@ -1898,7 +1907,10 @@ def __init__(
# List of provisioned guests and loaded guest data
self._guests: list[Guest] = []
self._guest_data: dict[str, GuestData] = {}
self.is_multihost = False

@property
def is_multihost(self) -> bool:
return len(self.data) > 1

def load(self) -> None:
""" Load guest data from the workdir """
Expand Down Expand Up @@ -1980,7 +1992,7 @@ def go(self) -> None:
# and no other actions should be done. An example of this is
# listing available images. In such case, the workdir is deleted
# as it's redundant and save() would throw an error.
nonsaveable_exceptions = (SystemExit, tmt.utils.SpecificationError)
# nonsaveable_exceptions = (SystemExit, tmt.utils.SpecificationError)
save = True

def _run_provision_phases(
Expand Down Expand Up @@ -2033,12 +2045,19 @@ def _run_action_phases(phases: list[Action]) -> list[ActionTask]:

return failed_tasks

# Provisioning phases may be intermixed with actions. To perform all
# phases and action in a consistent manner, we will process them in
# the order or their `order` key. We will group provisioning phases
# not interrupted by action into batches, and run the sequence of
# provisioning phases in parallel.
all_phases = self.phases(classes=(Action, ProvisionPlugin))
all_phases.sort(key=lambda x: x.order)

failed_tasks: list[Union[ActionTask, ProvisionTask]] = []

while all_phases:
# Start looking for sequences of phases of the same kind. Collect
# as many as possible, until hitting a different one
phase = all_phases.pop(0)

if isinstance(phase, Action):
Expand Down

0 comments on commit 1ce652a

Please sign in to comment.