Skip to content

Commit

Permalink
feat: patch Service.launch to ensure 3 stages
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt committed Jan 3, 2025
1 parent 449429b commit 702f3b0
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 40 deletions.
4 changes: 3 additions & 1 deletion _bootstrap/_resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ def resolve_dependencies(
result: list[list[str]] = []

while unresolved:
layer_candidates = [service for service in unresolved.values() if resolved_id.issuperset(dependencies_map[service.id])]
layer_candidates = [
service for service in unresolved.values() if resolved_id.issuperset(dependencies_map[service.id])
]

if not layer_candidates:
raise TypeError("Failed to resolve requirements due to cyclic dependencies or unmet constraints.")
Expand Down
17 changes: 10 additions & 7 deletions _bootstrap/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

import asyncio
import signal
from typing import TYPE_CHECKING, Any, Iterable
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any, Iterable

from exceptiongroup import BaseExceptionGroup # noqa: A004
from loguru import logger

from .utiles import TaskGroup, any_completed, cvar, unity

from ._resolve import resolve_dependencies, validate_services_removal
from .context import ServiceContext
from .status import Phase, Stage
from .utiles import cancel_alive_tasks
from .utiles import TaskGroup, any_completed, cancel_alive_tasks, cvar, unity

if TYPE_CHECKING:
from .service import Service
Expand Down Expand Up @@ -95,7 +93,8 @@ async def start_lifespan(
self.contexts[i.get_name()].dispatch_online()

await self._handle_stage_cleanup(
[self.services[i.get_name()] for i in done] + [self.services[i.get_name()] for i in curr if not i.done()]
[self.services[i.get_name()] for i in done]
+ [self.services[i.get_name()] for i in curr if not i.done()]
)

return _dummy_online
Expand Down Expand Up @@ -178,7 +177,9 @@ async def _handle_stage_cleanup(self, services: Iterable[Service], *, trigger_ex
self._sigexit_trig([service_bind[i] for i in layer])

awaiting_daemon_exit = asyncio.create_task(any_completed(daemon_tasks))
awaiting_dispatch_ready = unity([i.wait_for(Stage.CLEANUP, Phase.WAITING) for i in _contexts.values()]) # awaiting_prepare
awaiting_dispatch_ready = unity(
[i.wait_for(Stage.CLEANUP, Phase.WAITING) for i in _contexts.values()]
) # awaiting_prepare
completed_task, _ = await any_completed([awaiting_daemon_exit, awaiting_dispatch_ready])

if completed_task is awaiting_daemon_exit:
Expand Down Expand Up @@ -210,7 +211,9 @@ async def launch(self):
with cvar(BOOTSTRAP_CONTEXT, self):
failed = []

online_dispatch = await self.start_lifespan(self.initial_services.values(), failed_record=failed, rollback=True)
online_dispatch = await self.start_lifespan(
self.initial_services.values(), failed_record=failed, rollback=True
)
offline_callback = online_dispatch()

try:
Expand Down
2 changes: 1 addition & 1 deletion _bootstrap/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ class Phase(int, Enum):
COMPLETED = 2


ServiceStatusValue = tuple[Stage, Phase]
ServiceStatusValue = tuple[Stage, Phase]
3 changes: 1 addition & 2 deletions _bootstrap/utiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from contextlib import contextmanager
from typing import TYPE_CHECKING, Coroutine, Iterable, TypeVar

from typing_extensions import TypeAlias
from loguru import logger

from typing_extensions import TypeAlias

if TYPE_CHECKING:
from contextvars import ContextVar
Expand Down
5 changes: 3 additions & 2 deletions launart/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from contextlib import suppress as _suppress

from _bootstrap._resolve import RequirementResolveFailed as RequirementResolveFailed # noqa: F401
from _bootstrap._resolve import ( # noqa: F401
RequirementResolveFailed as RequirementResolveFailed,
)
from _bootstrap.utiles import any_completed as any_completed # noqa: F401

from .manager import Launart as Launart
from .service import Service as Service


with _suppress(ImportError, ModuleNotFoundError):
from .saya import LaunartBehaviour as LaunartBehaviour
from .saya import ServiceSchema as ServiceSchema
Expand Down
151 changes: 151 additions & 0 deletions launart/_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import annotations

import ast
import inspect
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from launart.manager import Launart
from launart.service import Service


def _dummy_blocking(lineno: int, indent: int) -> ast.AsyncWith:
return ast.AsyncWith(
items=[
ast.withitem(
context_expr=ast.Call(
func=ast.Attribute(
value=ast.Name(
id="self",
ctx=ast.Load(),
lineno=lineno,
col_offset=19 + indent,
end_lineno=lineno,
end_col_offset=23 + indent,
),
attr="stage",
ctx=ast.Load(),
lineno=lineno,
col_offset=19 + indent,
end_lineno=lineno,
end_col_offset=29 + indent,
),
args=[
ast.Constant(
value="blocking",
lineno=lineno,
col_offset=30 + indent,
end_lineno=lineno,
end_col_offset=40 + indent,
)
],
keywords=[],
lineno=lineno,
col_offset=19 + indent,
end_lineno=lineno,
end_col_offset=41 + indent,
),
)
],
body=[ast.Pass(lineno=lineno + 1, col_offset=12 + indent, end_lineno=lineno + 1, end_col_offset=16 + indent)],
lineno=lineno,
col_offset=8 + indent,
end_lineno=lineno + 1,
end_col_offset=16 + indent,
)


def patch_launch(serv: Service) -> Callable[[Launart], Awaitable[None]]:
if serv.stages == {"preparing", "blocking", "cleanup"}:
return serv.launch
elif not serv.stages:

async def _launch(manager: Launart):
await serv.launch(manager)
async with serv.stage("preparing"):
pass
async with serv.stage("blocking"):
pass
async with serv.stage("cleanup"):
pass

return _launch
elif serv.stages == {"preparing", "blocking"}:

async def _launch(manager: Launart):
await serv.launch(manager)
async with serv.stage("cleanup"):
pass

return _launch
elif serv.stages == {"blocking", "cleanup"}:

async def _launch(manager: Launart):
async with serv.stage("preparing"):
pass
await serv.launch(manager)

return _launch
elif serv.stages == {"preparing"}:

async def _launch(manager: Launart):
await serv.launch(manager)
async with serv.stage("blocking"):
pass
async with serv.stage("cleanup"):
pass

return _launch
elif serv.stages == {"blocking"}:

async def _launch(manager: Launart):
async with serv.stage("preparing"):
pass
await serv.launch(manager)
async with serv.stage("cleanup"):
pass

return _launch
elif serv.stages == {"cleanup"}:

async def _launch(manager: Launart):
async with serv.stage("preparing"):
pass
async with serv.stage("blocking"):
pass
await serv.launch(manager)

return _launch
else:
nodes = ast.parse(inspect.getsource(serv.__class__))
for node in ast.walk(nodes):
if isinstance(node, ast.AsyncFunctionDef) and node.name == "launch":
break
else:
raise ValueError("this component has no launch method.")
for index, _node in enumerate(node.body):
if isinstance(_node, ast.AsyncWith):
expr = _node.items[0].context_expr
if (
isinstance(expr, ast.Call)
and isinstance(expr.func, ast.Attribute)
and isinstance(expr.func.value, ast.Name)
and expr.func.value.id == "self"
and expr.func.attr == "stage"
):
break
else:
raise ValueError("this component has no stage method.")
new = _dummy_blocking(_node.lineno + 1, node.col_offset - 4)
for other in node.body[index:]:
for n in ast.walk(other):
if hasattr(n, "lineno"):
n.lineno += 2 # type: ignore
if hasattr(n, "end_lineno"):
n.end_lineno += 2 # type: ignore
node.body.insert(index + 1, new)
text = ast.unparse(node)
lcs = {}
exec(text, serv.launch.__globals__, lcs) # noqa
return lcs["launch"].__get__(serv, serv.__class__)
16 changes: 7 additions & 9 deletions launart/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

from loguru import logger

from launart.service import Service, make_service

from _bootstrap import Bootstrap
from _bootstrap import Service as _Service
from _bootstrap.utiles import cvar, cancel_alive_tasks
from _bootstrap.utiles import cancel_alive_tasks, cvar
from launart.service import Service, make_service

from .status import ManagerStatus

Expand All @@ -25,16 +24,14 @@ class Launart:

def __init__(self):
self._core = Bootstrap()
self._default_isolate = {
"interface_provide": {}
}
self._default_isolate = {"interface_provide": {}}

@classmethod
def current(cls) -> Launart:
return cls._context.get()

def export_interface(self, interface: type, service: Service):
self._default_isolate['interface_provide'][interface] = service
self._default_isolate["interface_provide"][interface] = service

def add_component(self, component: Service):
if not self._core.running:
Expand Down Expand Up @@ -77,7 +74,7 @@ def remove_component(
# TODO: remove service during running.

def get_interface(self, interface_type: type[T]) -> T:
provider_map = self._default_isolate['interface_provide']
provider_map = self._default_isolate["interface_provide"]
service = provider_map.get(interface_type)
if service is None:
raise ValueError(f"{interface_type} is not supported.")
Expand All @@ -93,10 +90,11 @@ def launch_blocking(
loop: asyncio.AbstractEventLoop | None = None,
stop_signal: Iterable[signal.Signals] = (signal.SIGINT,),
):
from creart import it
import contextlib
import threading

from creart import it

if loop is not None: # pragma: no cover
from warnings import warn

Expand Down
1 change: 1 addition & 0 deletions launart/saya.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from graia.saya.behaviour import Behaviour
from graia.saya.cube import Cube
from graia.saya.schema import BaseSchema

from launart import Launart, Service


Expand Down
20 changes: 12 additions & 8 deletions launart/service.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from __future__ import annotations

from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Literal, Optional, Set, ClassVar
from typing import TYPE_CHECKING, ClassVar, Literal, Optional, Set


from _bootstrap.service import Service as BaseService
from _bootstrap.context import ServiceContext
from _bootstrap.status import Stage
from _bootstrap.service import Service as BaseService

from .util import override
from .status import ManagerStatus
from .util import override
from ._patch import patch_launch

if TYPE_CHECKING:
from launart.manager import Launart
Expand All @@ -25,6 +24,11 @@ class Service(metaclass=ABCMeta):
def required(self) -> Set[str]:
...

@property
@abstractmethod
def stages(self) -> Set[Literal["preparing", "blocking", "cleanup"]]:
...

def ensure_manager(self, manager: Launart):
if self.manager is not None and self.manager is not manager:
raise RuntimeError("this component attempted to be mistaken a wrong ownership of launart/manager.")
Expand All @@ -38,8 +42,6 @@ def _ensure_context(self, context: ServiceContext):
def stage(self, stage: Literal["preparing", "blocking", "cleanup"]):
if self._context is None:
raise RuntimeError("attempted to set stage of a component without a context.")
# if self._context._status[0] is Stage.EXIT:
# raise LookupError("attempted to set stage of a component without a current context")
if stage not in {"preparing", "blocking", "cleanup"}:
raise ValueError(f"undefined and unexpected stage entering: {stage}")
ctx = self._context
Expand All @@ -59,6 +61,8 @@ async def launch(self, manager: Launart):
def make_service(serv: Service) -> BaseService:
from launart.manager import Launart

launch = patch_launch(serv)

class _Service(BaseService):
id = serv.id
__launart_service__: ClassVar[Service] = serv
Expand All @@ -70,7 +74,7 @@ def dependencies(self):
async def launch(self, context: ServiceContext):
serv._ensure_context(context)
manager = Launart.current()
await serv.launch(override(manager, {"status": ManagerStatus(context)}))
await launch(override(manager, {"status": ManagerStatus(context)}))

b_s = type(serv.__class__.__name__, (_Service,), {})()
return b_s
3 changes: 1 addition & 2 deletions launart/status.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from _bootstrap.context import ServiceContext
from _bootstrap.status import Stage, Phase
from _bootstrap.status import Phase, Stage


class ManagerStatus:

def __init__(self, context: ServiceContext):
self._context = context

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ build-backend = "pdm.backend"
exclude = ["__pypackages__"]

[tool.pytest.ini_options]
python_files = "tests/*"
python_files = "test.py" # TODO: fix tests/*
asyncio_mode = "strict"
norecursedirs = "_saya_mod"
Loading

0 comments on commit 702f3b0

Please sign in to comment.