From dee4ce7e90d12ca3d163671d3eae074dc322bb23 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Fri, 20 Sep 2024 13:45:52 -0400 Subject: [PATCH] refactor(py-sdk): allow a better way to integrate w/ Silverback --- pyproject.toml | 2 + sdk/py/apepay/daemon.py | 140 -------------------------------------- sdk/py/apepay/manager.py | 78 ++++++++++++++++++++- sdk/py/apepay/settings.py | 34 --------- 4 files changed, 78 insertions(+), 176 deletions(-) delete mode 100644 sdk/py/apepay/daemon.py delete mode 100644 sdk/py/apepay/settings.py diff --git a/pyproject.toml b/pyproject.toml index fc3cae1e..9a633fc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,8 @@ lint = [ "black", "isort", "mypy", + # NOTE: Be able to lint our silverback add-ons + "apepay[bot]" ] test = [ "ape-foundry", diff --git a/sdk/py/apepay/daemon.py b/sdk/py/apepay/daemon.py deleted file mode 100644 index 310ae5c4..00000000 --- a/sdk/py/apepay/daemon.py +++ /dev/null @@ -1,140 +0,0 @@ -import asyncio -import os -from datetime import timedelta -from enum import Enum - -import click -from ape.types import AddressType -from silverback import SilverbackApp - -from apepay import Stream, StreamManager - -from .settings import Settings - -settings = Settings() - - -class Status(Enum): - NORMAL = "normal" - WARNING = "warning" - CRITICAL = "critical" - INACTIVE = "inactive" - - @classmethod - def from_time_left(cls, time_left: timedelta) -> "Status": - if time_left > settings.WARNING_LEVEL: - return cls.NORMAL - - elif time_left > settings.CRITICAL_LEVEL: - return cls.WARNING - - elif time_left.total_seconds() > 0: - return cls.CRITICAL - - else: - return cls.INACTIVE - - -SM = StreamManager( - address=os.environ.get("APEPAY_CONTRACT_ADDRESS") - or click.prompt("What address to use?", type=AddressType) -) - -app = SilverbackApp() - - -async def create_task_by_status(stream: Stream): - time_left = stream.time_left - stream_status = Status.from_time_left(time_left) - - task = { - Status.NORMAL: stream_funding_normal_level, - Status.WARNING: stream_funding_warning_level, - Status.CRITICAL: stream_funding_critical_level, - Status.INACTIVE: stream_cancelled, - }[stream_status] - - if stream_status is Status.INACTIVE: - await task.kiq(stream.to_event()) - - else: - await task.kicker().with_labels(time_left=time_left).kiq(stream) - - return {"status": stream_status.value} - - -@app.on_startup() -async def app_started(state): - return await asyncio.gather( - # Start watching all active streams and claim any completed but unclaimed streams - *( - create_task_by_status(stream) - for stream in SM.all_streams() - if stream.is_active or stream.amount_unlocked > 0 - ) - ) - - -@app.on_(SM.contract.StreamCreated) -async def stream_created(event): - stream = Stream.from_event( - manager=SM, - event=event, - is_creation_event=True, - ) - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/normal") -async def stream_funding_normal_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.NORMAL: - # Wait until we're in warning range - await asyncio.sleep((stream.time_left - settings.WARNING_LEVEL).total_seconds()) - - # Check if stream has been cancelled - if Status.from_time_left(stream.time_left) is Status.WARNING: - # TODO: Trigger funding warning notification - print(f"Warning: only {stream.time_left} left") - - elif Status.from_time_left(stream.time_left) is Status.CRITICAL: - # TODO: Trigger funding critical notification - print(f"Critical: only {stream.time_left} left") - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/warning") -async def stream_funding_warning_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.WARNING: - # Wait for critical - await asyncio.sleep((stream.time_left - settings.CRITICAL_LEVEL).total_seconds()) - - # Check if stream has been cancelled - if Status.from_time_left(stream.time_left) is Status.CRITICAL: - # TODO: Trigger funding critical notification - print(f"Critical: only {stream.time_left} left") - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/critical") -async def stream_funding_critical_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.CRITICAL: - # Wait until there's no time left - await asyncio.sleep(stream.time_left.total_seconds()) - - return await create_task_by_status(stream) - - -@app.on_(SM.contract.StreamCancelled) -async def stream_cancelled(event): - stream = Stream( - manager=SM, - creator=event.creator, - stream_id=event.stream_id, - ) - if app.signer and stream.amount_unlocked > 0: - stream.claim(sender=app.signer) - - return {"claimed": stream.amount_unlocked} diff --git a/sdk/py/apepay/manager.py b/sdk/py/apepay/manager.py index a26f1aa4..53adceb8 100644 --- a/sdk/py/apepay/manager.py +++ b/sdk/py/apepay/manager.py @@ -2,10 +2,15 @@ from collections.abc import Iterator from datetime import datetime, timedelta from functools import partial, wraps -from typing import Any, Callable, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Union, cast from ape.api import ReceiptAPI -from ape.contracts.base import ContractCallHandler, ContractInstance, ContractTransactionHandler +from ape.contracts.base import ( + ContractCallHandler, + ContractEvent, + ContractInstance, + ContractTransactionHandler, +) from ape.exceptions import ContractLogicError, DecodingError from ape.types import AddressType, HexBytes from ape.utils import BaseInterfaceModel, cached_property @@ -18,6 +23,10 @@ from .utils import time_unit_to_timedelta from .validators import Validator +if TYPE_CHECKING: + # NOTE: We really only use this for type checking, optional install + from silverback import SilverbackApp + MAX_DURATION_SECONDS = int(timedelta.max.total_seconds()) - 1 _ValidatorItem = Union[Validator, ContractInstance, AddressType] @@ -192,6 +201,71 @@ def create( is_creation_event=True, ) + def _parse_stream_decorator(self, app: "SilverbackApp", container: ContractEvent): + + def decorator(f): + + @app.on_(container) + @wraps(f) + def inner(log): + return f(Stream(manager=self, creator=log.creator, stream_id=log.stream_id)) + + return inner + + return decorator + + def on_stream_created(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_created(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamCreated) + + def on_stream_funded(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_funded(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamFunded) + + def on_stream_claimed(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_claimed(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.Claimed) + + def on_stream_cancelled(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_cancelled(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamCancelled) + def streams_by_creator(self, creator: AddressType) -> Iterator["Stream"]: for stream_id in range(self.contract.num_streams(creator)): yield Stream(manager=self, creator=creator, stream_id=stream_id) diff --git a/sdk/py/apepay/settings.py b/sdk/py/apepay/settings.py deleted file mode 100644 index 6974dd27..00000000 --- a/sdk/py/apepay/settings.py +++ /dev/null @@ -1,34 +0,0 @@ -from datetime import timedelta -from typing import Any - -from pydantic import BaseSettings, validator - -from apepay.utils import time_unit_to_timedelta - - -class Settings(BaseSettings): - WARNING_LEVEL: timedelta = timedelta(days=2) - CRITICAL_LEVEL: timedelta = timedelta(hours=12) - - @validator("WARNING_LEVEL", "CRITICAL_LEVEL", pre=True) - def _normalize_timedelta(cls, value: Any) -> timedelta: - if isinstance(value, timedelta): - return value - - elif isinstance(value, int): - return timedelta(seconds=value) - - elif not isinstance(value, str): - raise ValueError(f"Cannot convert: {value}") - - elif ":" in value and len(value.split(":")) == 3: - h, m, s = value.split(":") - return timedelta(hours=int(h), minutes=int(m), seconds=int(s)) - - else: - multiplier, time_unit = value.split(" ") - return int(multiplier) * time_unit_to_timedelta(time_unit) - - class Config: - env_prefix = "APEPAY_" - case_sensitive = True