Skip to content

Commit

Permalink
Change to schedule_job and always return job id
Browse files Browse the repository at this point in the history
  • Loading branch information
mdegat01 committed Jan 18, 2024
1 parent 7a4a9a5 commit bec47e9
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 123 deletions.
100 changes: 52 additions & 48 deletions supervisor/api/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from aiohttp.hdrs import CONTENT_DISPOSITION
import voluptuous as vol

from ..backups.backup import Backup
from ..backups.validate import ALL_FOLDERS, FOLDER_HOMEASSISTANT, days_until_stale
from ..const import (
ATTR_ADDONS,
Expand Down Expand Up @@ -39,6 +40,7 @@
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..jobs import JobSchedulerOptions
from ..mounts.const import MountUsage
from ..resolution.const import UnhealthyReason
from .const import ATTR_BACKGROUND, ATTR_JOB_ID, CONTENT_TYPE_TAR
Expand Down Expand Up @@ -214,104 +216,106 @@ def _location_to_mount(self, body: dict[str, Any]) -> dict[str, Any]:

async def _background_backup_task(
self, backup_method: Callable, *args, **kwargs
) -> dict[str, str] | bool:
"""Start backup task in background and return result."""
) -> tuple[asyncio.Task, str]:
"""Start backup task in background and return task and job ID."""
event = asyncio.Event()
job, backup_task = self.sys_jobs.schedule_job(
backup_method, JobSchedulerOptions(), *args, **kwargs
)

async def release_on_freeze(new_state: CoreState):
if new_state == CoreState.FREEZE:
event.set()

# Wait for system to get into freeze state before returning
# If the backup fails validation it will raise before getting there
listener = self.sys_bus.register_event(
BusEvent.SUPERVISOR_STATE_CHANGE, release_on_freeze
)
try:
backup_task = self.sys_create_task(backup_method(*args, **kwargs))
await asyncio.wait(
(
backup_task,
self.sys_create_task(event.wait()),
),
return_when=asyncio.FIRST_COMPLETED,
)
if self.sys_backups.active_job:
return {ATTR_JOB_ID: self.sys_backups.active_job.uuid}
return False
return (backup_task, job.uuid)
finally:
self.sys_bus.remove_listener(listener)

@api_process
async def backup_full(self, request):
"""Create full backup."""
body = await api_validate(SCHEMA_BACKUP_FULL, request)

if body.pop(ATTR_BACKGROUND, False):
return await self._background_backup_task(
self.sys_backups.do_backup_full, **self._location_to_mount(body)
)

backup = await asyncio.shield(
self.sys_backups.do_backup_full(
_job_override__cleanup=True, **self._location_to_mount(body)
)
background = body.pop(ATTR_BACKGROUND, False)
backup_task, job_id = await self._background_backup_task(
self.sys_backups.do_backup_full, **self._location_to_mount(body)
)

if background:
return {ATTR_JOB_ID: job_id}

backup: Backup = await backup_task
if backup:
return {ATTR_SLUG: backup.slug}
return False
return {ATTR_JOB_ID: job_id, ATTR_SLUG: backup.slug}
raise APIError(
f"An error occurred while making backup, check job '{job_id}' or supervisor logs for details",
job_id=job_id,
)

@api_process
async def backup_partial(self, request):
"""Create a partial backup."""
body = await api_validate(SCHEMA_BACKUP_PARTIAL, request)

if body.pop(ATTR_BACKGROUND, False):
return await self._background_backup_task(
self.sys_backups.do_backup_partial, **self._location_to_mount(body)
)

backup = await asyncio.shield(
self.sys_backups.do_backup_partial(
_job_override__cleanup=True, **self._location_to_mount(body)
)
background = body.pop(ATTR_BACKGROUND, False)
backup_task, job_id = await self._background_backup_task(
self.sys_backups.do_backup_partial, **self._location_to_mount(body)
)

if background:
return {ATTR_JOB_ID: job_id}

backup: Backup = await backup_task
if backup:
return {ATTR_SLUG: backup.slug}
return False
return {ATTR_JOB_ID: job_id, ATTR_SLUG: backup.slug}
raise APIError(
f"An error occurred while making backup, check job '{job_id}' or supervisor logs for details",
job_id=job_id,
)

@api_process
async def restore_full(self, request):
"""Full restore of a backup."""
backup = self._extract_slug(request)
body = await api_validate(SCHEMA_RESTORE_FULL, request)
background = body.pop(ATTR_BACKGROUND, False)
restore_task, job_id = await self._background_backup_task(
self.sys_backups.do_restore_full, backup, **body
)

if body.pop(ATTR_BACKGROUND, False):
return await self._background_backup_task(
self.sys_backups.do_restore_full, backup, **body
)

return await asyncio.shield(
self.sys_backups.do_restore_full(
backup, _job_override__cleanup=True, **body
)
if background or await restore_task:
return {ATTR_JOB_ID: job_id}
raise APIError(
f"An error occurred during restore of {backup.slug}, check job '{job_id}' or supervisor logs for details",
job_id=job_id,
)

@api_process
async def restore_partial(self, request):
"""Partial restore a backup."""
backup = self._extract_slug(request)
body = await api_validate(SCHEMA_RESTORE_PARTIAL, request)
background = body.pop(ATTR_BACKGROUND, False)
restore_task, job_id = await self._background_backup_task(
self.sys_backups.do_restore_partial, backup, **body
)

if body.pop(ATTR_BACKGROUND, False):
return await self._background_backup_task(
self.sys_backups.do_restore_partial, backup, **body
)

return await asyncio.shield(
self.sys_backups.do_restore_partial(
backup, _job_override__cleanup=True, **body
)
if background or await restore_task:
return {ATTR_JOB_ID: job_id}
raise APIError(
f"An error occurred during restore of {backup.slug}, check job '{job_id}' or supervisor logs for details",
job_id=job_id,
)

@api_process
Expand Down
13 changes: 9 additions & 4 deletions supervisor/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
HEADER_TOKEN,
HEADER_TOKEN_OLD,
JSON_DATA,
JSON_JOB_ID,
JSON_MESSAGE,
JSON_RESULT,
REQUEST_FROM,
Expand Down Expand Up @@ -124,11 +125,15 @@ def api_return_error(
if check_exception_chain(error, DockerAPIError):
message = format_message(message)

result = {
JSON_RESULT: RESULT_ERROR,
JSON_MESSAGE: message or "Unknown error, see supervisor",
}
if isinstance(error, APIError) and error.job_id:
result[JSON_JOB_ID] = error.job_id

return web.json_response(
{
JSON_RESULT: RESULT_ERROR,
JSON_MESSAGE: message or "Unknown error, see supervisor",
},
result,
status=400,
dumps=json_dumps,
)
Expand Down
4 changes: 3 additions & 1 deletion supervisor/backups/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,9 @@ def _restore() -> bool:
gzip=self.compressed,
bufsize=BUF_SIZE,
) as tar_file:
tar_file.extractall(path=origin_dir, members=tar_file)
tar_file.extractall(
path=origin_dir, members=tar_file, filter="fully_trusted"
)
_LOGGER.info("Restore folder %s done", name)
except (tarfile.TarError, OSError) as err:
raise BackupError(
Expand Down
5 changes: 4 additions & 1 deletion supervisor/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
JSON_DATA = "data"
JSON_MESSAGE = "message"
JSON_RESULT = "result"
JSON_JOB_ID = "job_id"

RESULT_ERROR = "error"
RESULT_OK = "ok"
Expand Down Expand Up @@ -458,9 +459,11 @@ class HostFeature(StrEnum):
class BusEvent(StrEnum):
"""Bus event type."""

DOCKER_CONTAINER_STATE_CHANGE = "docker_container_state_change"
HARDWARE_NEW_DEVICE = "hardware_new_device"
HARDWARE_REMOVE_DEVICE = "hardware_remove_device"
DOCKER_CONTAINER_STATE_CHANGE = "docker_container_state_change"
SUPERVISOR_JOB_END = "supervisor_job_end"
SUPERVISOR_JOB_START = "supervisor_job_start"
SUPERVISOR_STATE_CHANGE = "supervisor_state_change"


Expand Down
32 changes: 29 additions & 3 deletions supervisor/coresys.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,29 @@ def run_in_executor(

return self.loop.run_in_executor(None, funct, *args)

def create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
def _create_context(self) -> Context:
"""Create a new context for a task."""
context = copy_context()
for callback in self._set_task_context:
context = callback(context)
return context

def create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
return self.loop.create_task(coroutine, context=self._create_context())

def run_later(
self,
funct: Callable[..., Coroutine[Any, Any, T]],
delay: float,
*args: tuple[Any],
**kwargs: dict[str, Any],
) -> asyncio.TimerHandle:
"""Start a task after a delay."""
if kwargs:
funct = partial(funct, **kwargs)

return self.loop.create_task(coroutine, context=context)
return self.loop.call_later(delay, funct, *args, context=self._create_context())


class CoreSysAttributes:
Expand Down Expand Up @@ -731,3 +747,13 @@ def sys_run_in_executor(
def sys_create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
return self.coresys.create_task(coroutine)

def sys_run_later(
self,
funct: Callable[..., Coroutine[Any, Any, T]],
delay: float,
*args: tuple[Any],
**kwargs: dict[str, Any],
) -> asyncio.TimerHandle:
"""Start a task after a delay."""
return self.coresys.run_later(funct, delay, *args, **kwargs)
10 changes: 10 additions & 0 deletions supervisor/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ class HostLogError(HostError):
class APIError(HassioError, RuntimeError):
"""API errors."""

def __init__(
self,
message: str | None = None,
logger: Callable[..., None] | None = None,
job_id: str | None = None,
) -> None:
"""Raise & log, optionally with job."""
super().__init__(message, logger)
self.job_id = job_id


class APIForbidden(APIError):
"""API forbidden error."""
Expand Down
Loading

0 comments on commit bec47e9

Please sign in to comment.