diff --git a/bots/README.md b/bots/README.md new file mode 100644 index 00000000..7caa58b9 --- /dev/null +++ b/bots/README.md @@ -0,0 +1,47 @@ +# Silverback Example App + +These apps are intended to be examples of how to work with your own deployment of the ApePay +protocol interacting your own infrastructure for a SaaS product, using Silverback for real-time +events. You can host these examples on the Silverback Platform (https://silverback.apeworx.io) or +self-host. + +## Stream Handling + +ApePay has some different events that should trigger updates to your off-chain tracking of the +current status of a Stream, which should also affect the delivery of your product or service. + +The lifecycle of a Stream starts with being created, after which funds can be added at any time to +extend the stream, and finally the Stream can be cancelled at any time (after an initial required +delay). + +At each point in the Stream lifecycle, an update to an off-chain record should be created to ensure +consistency within your product and how it is managing aspects of your microservice architecture, +and your Silverback app can directly trigger changes in those other services or perform those +changes themselves. + +One important thing to note is that while almost every part of the lifecycle has some sort of on- +chain action that occurs when a user performs an action, a Stream can also "run out of time" which +means you should notify your users as the deadline becomes near to prevent service interruptions, +and also necessitates performing "garbage collection" after the Streams run out completely in order +to remove the associated resources that are no longer being paid for (similar to when a Stream is +cancelled manually). + +Lastly, it is important that you understand your own regulatory and reporting requirements and +implement those using a combination of specialized ApePay "validator" contracts as well as trigger- +ing manual cancellation (or review) of the services for breach of terms within your app. +These tasks require a key with access to the Owner role and may be best implemented as a separate +bot to ensure access control restrictions and not interfere with other running bots. + +## Revenue Collection + +One interesting perk of ApePay is that claiming a Stream (aka "earning revenue") is entirely left +to you as an action you can do at whatever frequency you desire (based on revenue cycles, gas +costs, etc.), and can integrate into other various parts of your revenue management systems, tax +tracking, or whatever else is needed to be accounted for (according to your business needs or +jurisdiction). + +It is recommended that you implement this as a separate bot, as optimizing revenue operations can +be a great way to improve overall cost optimization, and also may require advanced access control +rights that your other microservices don't require. We provide an example here of what that might +look like within the example, however please note that a key that has the ability to make +transactions is required for production use. diff --git a/bots/example.py b/bots/example.py new file mode 100644 index 00000000..9322db28 --- /dev/null +++ b/bots/example.py @@ -0,0 +1,50 @@ +import os +from collections import defaultdict + +from ape.types import AddressType +from silverback import SilverbackApp + +from apepay import Stream, StreamManager + +app = SilverbackApp() +# NOTE: This bot assumes you use a new bot per ApePay deployment +sm = StreamManager(os.environ["APEPAY_CONTRACT_ADDRESS"]) + +# NOTE: You would probably want to index your db by network and deployment address, +# if you were operating on multiple networks and/or deployments (for easy lookup) +db: defaultdict[AddressType, list[Stream]] = defaultdict(list) +# TODO: Migrate to `app.state.db` when feature becomes available + + +@app.on_startup() +async def load_db(_): + for stream in sm.active_streams(): + while len(db[stream.creator]) < stream.stream_id: + db[stream.creator].append(None) # Fill with empty values + assert stream.stream_id == len(db[stream.creator]) + db[stream.creator].append(stream) + + +@sm.on_stream_created(app) +async def grant_product(stream): + assert stream.stream_id == len(db[stream.creator]) + db[stream.creator].append(stream) + print(f"provisioning product for {stream.creator}") + return stream.time_left + + +@sm.on_stream_funded(app) +async def update_product_funding(stream): + # NOTE: properties of stream have changed, you may not need to handle this, but typically you + # would want to update `stream.time_left` in db for use in user Stream life notifications + db[stream.creator].pop(stream.stream_id) + db[stream.creator].insert(stream.stream_id, stream) + return stream.time_left + + +@sm.on_stream_cancelled(app) +async def revoke_product(stream): + print(f"unprovisioning product for {stream.creator}") + db[stream.creator].pop(stream.stream_id) + db[stream.creator].insert(stream.stream_id, None) + return stream.time_left diff --git a/pyproject.toml b/pyproject.toml index d476a4fd..9a633fc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,9 +25,12 @@ bot = [ "silverback>=0.5,<1", ] lint = [ + "flake8", "black", "isort", "mypy", + # NOTE: Be able to lint our silverback add-ons + "apepay[bot]" ] test = [ "ape-foundry", diff --git a/scripts/demo.py b/scripts/demo.py new file mode 100644 index 00000000..e058d3c7 --- /dev/null +++ b/scripts/demo.py @@ -0,0 +1,96 @@ +""" +A demo showing some accounts randomly creating, modifying, and cancelling streams +""" + +import random +from datetime import timedelta + +import click +from ape.cli import ConnectedProviderCommand, ape_cli_context + +from apepay import StreamManager + + +@click.command(cls=ConnectedProviderCommand) +@ape_cli_context() +@click.option("-l", "--min-stream-life", default=0) +@click.option("-n", "--num-accounts", default=10) +@click.option("-b", "--num-blocks", default=1000) +@click.option("-m", "--max-streams", default=10) +@click.option("-c", "--create-stream", type=float, default=0.1) +@click.option("-f", "--fund-stream", type=float, default=0.7) +@click.option("-k", "--cancel-stream", type=float, default=0.2) +def cli( + cli_ctx, + min_stream_life, + num_accounts, + num_blocks, + max_streams, + create_stream, + fund_stream, + cancel_stream, +): + # Initialize experiment + deployer = cli_ctx.account_manager.test_accounts[-1] + token = cli_ctx.local_project.TestToken.deploy(sender=deployer) + sm = StreamManager( + cli_ctx.local_project.StreamManager.deploy( + deployer, min_stream_life, [], [token], sender=deployer + ) + ) + + # Wait for user to start the example SB app... + click.secho( + f"Please run `APEPAY_CONTRACT_ADDRESS={sm.address} silverback run bots.example:app`", + fg="bright_magenta", + ) + if not click.confirm("Start experiment?"): + return + + # Make sure all accounts have some tokens + accounts = cli_ctx.account_manager.test_accounts[:num_accounts] + decimals = token.decimals() + for account in accounts: + token.DEBUG_mint(account, 10_000 * 10**decimals, sender=account) + + # 26 tokens per day + starting_life = timedelta(minutes=5).total_seconds() + starting_tokens = 26 * 10**decimals + funding_amount = 2 * 10**decimals + streams = {a.address: [] for a in accounts} + + while cli_ctx.chain_manager.blocks.head.number < num_blocks: + payer = random.choice(accounts) + + # Do a little garbage collection + for stream in streams[payer.address]: + click.echo(f"{payer}:{stream.stream_id} - {stream.time_left}") + if not stream.is_active: + click.echo(f"Stream '{payer}:{stream.stream_id}' is expired, removing...") + streams[payer.address].remove(stream) + + if len(streams[payer.address]) > 0: + stream = random.choice(streams[payer.address]) + + if token.balanceOf(payer) >= 10 ** (decimals + 1) and random.random() < fund_stream: + click.echo( + f"Stream '{payer}:{stream.stream_id}' is being funded " + f"w/ {funding_amount / 10**decimals:.2f} tokens..." + ) + token.approve(sm.address, funding_amount, sender=payer) + stream.add_funds(funding_amount, sender=payer) + + elif random.random() < cancel_stream: + click.echo(f"Stream '{payer}:{stream.stream_id}' is being cancelled...") + stream.cancel(sender=payer) + streams[payer.address].remove(stream) + + elif token.balanceOf(payer) < starting_tokens: + continue + + elif len(streams[payer.address]) < max_streams and random.random() < create_stream: + click.echo(f"'{payer}' is creating a new stream...") + token.approve(sm.address, starting_tokens, sender=payer) + stream = sm.create(token, int(starting_tokens / starting_life), sender=payer) + streams[payer.address].append(stream) + click.echo(f"Stream '{payer}:{stream.stream_id}' was created successfully.") diff --git a/scripts/example.py b/scripts/example.py deleted file mode 100644 index ad639413..00000000 --- a/scripts/example.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -A simple example showing how to use ApePay in a script. -""" - -from datetime import timedelta - -import click -from ape.cli import ConnectedProviderCommand, ape_cli_context - -from apepay import StreamManager - - -@click.command(cls=ConnectedProviderCommand) -@ape_cli_context() -@click.option( - "--apepay", - "sm", - default="0xb5ed1ef2a90527b402cd7e7d415027cb94e1db4e", - callback=lambda c, p, v: StreamManager(address=v), -) -@click.option("--token", default="0xbc083d97825da7f7182f37fcec51818e196af1ff") -def cli(cli_ctx, network, sm, token): - if network.name != "sepolia-fork": - cli_ctx.abort("Currently, this script only works on sepolia-fork.") - - payer = cli_ctx.account_manager.test_accounts[0] - - # Make sure account can pay. - token = cli_ctx.chain_manager.contracts.instance_at(token) - - # Make sure your payer has 10k tokens. - balance = token.balanceOf(payer) - desired_balance = 10_000 * 10 ** token.decimals() - if balance < desired_balance: - difference = desired_balance - balance - token.DEBUG_mint(payer, difference, sender=payer) - - # Approve the amount it costs for the deployment. - # In this demo, we know it will add up to 26 tokens. - token.approve(sm.contract, 2**256 - 1, sender=payer) - decimals = token.decimals() - - # 26 tokens per day - seconds = timedelta(days=1).total_seconds() - tokens = 26 * 10**decimals - - # Create the stream. - stream = sm.create( - token, - int(tokens / seconds), - reason="1", # The ID of the deployment as a string - sender=payer, - ) - - click.echo(f"Stream '{stream.stream_id}' created successfully by '{stream.creator}'.") diff --git a/sdk/py/apepay/__init__.py b/sdk/py/apepay/__init__.py index dbce6268..09e8e217 100644 --- a/sdk/py/apepay/__init__.py +++ b/sdk/py/apepay/__init__.py @@ -1,444 +1,13 @@ -import importlib -import json -from collections.abc import Iterable, Iterator -from datetime import datetime, timedelta -from decimal import Decimal -from functools import partial -from typing import Any, ClassVar, Union, cast - -from ape.api import ReceiptAPI -from ape.contracts.base import ContractInstance, ContractTransactionHandler -from ape.exceptions import ( - CompilerError, - ContractLogicError, - ContractNotFoundError, - DecodingError, - ProjectError, -) -from ape.types import AddressType, ContractLog, HexBytes -from ape.utils import BaseInterfaceModel, cached_property -from ethpm_types import ContractType, PackageManifest -from pydantic import ValidationError, field_validator - -from .exceptions import ( - FundsNotClaimable, - MissingCreationReceipt, - StreamLifeInsufficient, - TokenNotAccepted, - ValidatorFailed, -) -from .package import MANIFEST -from .utils import time_unit_to_timedelta - -MAX_DURATION_SECONDS = int(timedelta.max.total_seconds()) - 1 - - -class Validator(BaseInterfaceModel): - address: AddressType - - @field_validator("address", mode="before") - def normalize_address(cls, value: Any) -> AddressType: - return cls.conversion_manager.convert(value, AddressType) - - @property - def contract(self) -> ContractInstance: - return MANIFEST.Validator.at(self.address) - - def __hash__(self) -> int: - # NOTE: So `set` works - return self.contract.address.__hash__() - - def __eq__(self, other: Any) -> bool: - if isinstance(other, Validator): - return self.contract.address == other.contract.address - - elif isinstance(other, ContractInstance): - return self.contract.address == other.address - - # Try __eq__ from the other side. - return NotImplemented - - def validate(self, creator, token, amount_per_second, reason) -> bool: - try: - self.contract.validate.call(creator, token, amount_per_second, reason) - return True - - except ContractLogicError: - return False - - -_ValidatorItem = Union[Validator, ContractInstance, str, AddressType] - - -class StreamManager(BaseInterfaceModel): - address: AddressType - - @field_validator("address", mode="before") - def normalize_address(cls, value: Any) -> AddressType: - return cls.conversion_manager.convert(value, AddressType) - - @property - def contract(self) -> ContractInstance: - return MANIFEST.StreamManager.at(self.address) - - def __repr__(self) -> str: - return f"" - - @property - def owner(self) -> AddressType: - return self.contract.owner() - - @property - def validators(self) -> list[Validator]: - validators = [] - - for idx in range(20): - try: - validator_address = self.contract.validators(idx) - - except (ContractLogicError, DecodingError): - # NOTE: Vyper returns no data if not a valid index - break - - validators.append(Validator(address=validator_address)) - - return validators - - def _convert_to_address(self, item: _ValidatorItem) -> str: - if isinstance(item, Validator): - return item.contract.address - elif isinstance(item, ContractInstance): - return item.address - else: - return item - - def set_validators( - self, - validators: list[_ValidatorItem], - **txn_kwargs, - ) -> ReceiptAPI: - if len(validators) >= 20: - raise ValueError("Validators full") - - return self.contract.set_validators( - [self._convert_to_address(v) for v in validators], - **txn_kwargs, - ) - - def add_validators( - self, - *new_validators: Iterable[_ValidatorItem], - **txn_kwargs, - ) -> ReceiptAPI: - return self.set_validators( - [*self.validators, *new_validators], - **txn_kwargs, - ) - - def remove_validators( - self, - *validators: Iterable[_ValidatorItem], - **txn_kwargs, - ) -> ReceiptAPI: - return self.set_validators( - list( - set(map(self._convert_to_address, self.validators)) - - set(map(self._convert_to_address, validators)) - ), - **txn_kwargs, - ) - - def add_token(self, token: ContractInstance | str | AddressType, **txn_kwargs) -> ReceiptAPI: - return self.contract.add_token(token, **txn_kwargs) - - def remove_token(self, token: ContractInstance | str | AddressType, **txn_kwargs) -> ReceiptAPI: - return self.contract.remove_token(token, **txn_kwargs) - - def is_accepted(self, token: ContractInstance | str | AddressType): - return self.contract.token_is_accepted(token) - - @cached_property - def MIN_STREAM_LIFE(self) -> timedelta: - return timedelta(seconds=self.contract.MIN_STREAM_LIFE()) - - def create( - self, - token: ContractInstance, - amount_per_second: str | int, - reason: HexBytes | bytes | str | dict | None = None, - start_time: datetime | int | None = None, - **txn_kwargs, - ) -> "Stream": - if not self.contract.token_is_accepted(token): - raise TokenNotAccepted(str(token)) - - if isinstance(amount_per_second, str) and "/" in amount_per_second: - value, time = amount_per_second.split("/") - amount_per_second = int( - self.conversion_manager.convert(value.strip(), int) - / time_unit_to_timedelta(time).total_seconds() - ) - - if amount_per_second == 0: - raise ValueError("`amount_per_second` must be greater than 0.") - - args: list[Any] = [token, amount_per_second] - - if reason is not None: - if isinstance(reason, dict): - reason = json.dumps(reason, separators=(",", ":")) - - if isinstance(reason, str): - reason = reason.encode("utf-8") - - args.append(reason) - - if start_time is not None: - if len(args) == 2: - args.append(b"") # Add empty reason string - - if isinstance(start_time, datetime): - args.append(int(start_time.timestamp())) - - elif isinstance(start_time, int) and start_time < 0: - args.append(self.chain_manager.pending_timestamp + start_time) - - else: - args.append(start_time) - - if sender := hasattr(token, "allowance") and txn_kwargs.get("sender"): - allowance = token.allowance(sender, self.contract) - - if allowance == 2**256 - 1: # NOTE: Sentinel value meaning "all balance" - allowance = token.balanceOf(sender) - - stream_life = allowance // amount_per_second - - if stream_life < self.MIN_STREAM_LIFE.total_seconds(): - raise StreamLifeInsufficient( - stream_life=timedelta(seconds=stream_life), - min_stream_life=self.MIN_STREAM_LIFE, - ) - - validator_args = [sender, *args[:2]] - # Arg 3 (reason) is optional - if len(args) == 3: - validator_args.append(args[2]) - else: - validator_args.append(b"") - # Skip arg 4 (start_time) - - for _validator in self.validators: - if not _validator.validate(*validator_args): - raise ValidatorFailed(_validator) - - tx = self.contract.create_stream(*args, **txn_kwargs) - event = tx.events.filter(self.contract.StreamCreated)[-1] - return Stream.from_event( - manager=self, - event=event, - is_creation_event=True, - ) - - def streams_by_creator(self, creator: AddressType) -> Iterator["Stream"]: - for stream_id in range(self.contract.num_streams(creator)): - yield Stream(manager=self, creator=creator, stream_id=stream_id) - - def all_streams(self, start_block: int | None = None) -> Iterator["Stream"]: - for stream_created_event in self.contract.StreamCreated.range( - start_block if start_block is not None else self.contract.receipt.block_number, - self.chain_manager.blocks.head.number, - ): - yield Stream.from_event( - manager=self, - event=stream_created_event, - is_creation_event=True, - ) - - def active_streams(self, start_block: int | None = None) -> Iterator["Stream"]: - for stream in self.all_streams(start_block=start_block): - if stream.is_active: - yield stream - - def unclaimed_streams(self, start_block: int | None = None) -> Iterator["Stream"]: - for stream in self.all_streams(start_block=start_block): - if not stream.is_active and stream.amount_unlocked > 0: - yield stream - - -class Stream(BaseInterfaceModel): - manager: StreamManager - creator: AddressType - stream_id: int - creation_receipt: ReceiptAPI | None = None - transaction_hash: HexBytes | None = None - - @field_validator("transaction_hash", mode="before") - def normalize_transaction_hash(cls, value: Any) -> HexBytes | None: - if value: - return HexBytes(cls.conversion_manager.convert(value, bytes)) - - return value - - @field_validator("creator", mode="before") - def validate_addresses(cls, value): - return ( - value if isinstance(value, str) else cls.conversion_manager.convert(value, AddressType) - ) - - @classmethod - def from_event( - cls, - manager: StreamManager, - event: ContractLog, - is_creation_event: bool = False, - ) -> "Stream": - return cls( - manager=manager, - creator=event.creator, - stream_id=event.stream_id, - transaction_hash=event.transaction_hash if is_creation_event else None, - ) - - def to_event(self) -> ContractLog: - return self.receipt.events.filter(self.manager.contract.StreamCreated)[0] - - @property - def contract(self) -> ContractInstance: - return self.manager.contract - - @property - def receipt(self) -> ReceiptAPI: - if self.creation_receipt: - return self.creation_receipt - - if self.transaction_hash: - receipt = self.chain_manager.get_receipt(self.transaction_hash.hex()) - self.creation_receipt = receipt - return receipt - - raise MissingCreationReceipt() - - def __repr__(self) -> str: - return ( - f"" - ) - - @property - def info(self): - return self.contract.streams(self.creator, self.stream_id) - - @cached_property - def token(self) -> ContractInstance: - if "TestToken" in self.local_project.contracts: - return self.local_project.TestToken.at(self.info.token) - - try: - return self.chain_manager.contracts.instance_at(self.info.token) - except ContractNotFoundError as err: - try: - from ape_tokens.managers import ERC20 - - return self.chain_manager.contracts.instance_at( - self.info.token, contract_type=ERC20 - ) - except ImportError: - raise err - - @cached_property - def amount_per_second(self) -> int: - return self.info.amount_per_second - - @property - def funding_rate(self) -> Decimal: - """ - Funding rate, in tokens per second, of Stream in correct decimal form. - """ - return Decimal(self.amount_per_second) / Decimal(10 ** self.token.decimals()) - - def estimate_funding(self, period: timedelta) -> int: - """ - Useful for estimating how many tokens you need to add to extend for a specific time period. - """ - return int(period.total_seconds() * self.amount_per_second) - - @cached_property - def start_time(self) -> datetime: - return datetime.fromtimestamp(self.info.start_time) - - @cached_property - def reason(self) -> HexBytes | str | dict: - try: - reason_str = self.info.reason.decode("utf-8") - - except Exception: - return self.info.reason - - try: - return json.loads(reason_str) - - except (Exception, json.JSONDecodeError): - return reason_str - - @property - def last_pull(self) -> datetime: - return datetime.fromtimestamp(self.info.last_pull) - - @property - def amount_unlocked(self) -> int: - return self.contract.amount_unlocked(self.creator, self.stream_id) - - @property - def amount_locked(self) -> int: - return self.info.funded_amount - self.amount_unlocked - - @property - def time_left(self) -> timedelta: - seconds = self.contract.time_left(self.creator, self.stream_id) - return timedelta(seconds=min(MAX_DURATION_SECONDS, seconds)) - - @property - def total_time(self) -> timedelta: - info = self.info # NOTE: Avoid calling contract twice - # NOTE: Measure time-duration of unclaimed amount remaining (locked and unlocked) - max_life = int(info.funded_amount / info.amount_per_second) - - return ( - # NOTE: `last_pull == start_time` if never pulled - datetime.fromtimestamp(info.last_pull) - - datetime.fromtimestamp(info.start_time) - + timedelta(seconds=min(MAX_DURATION_SECONDS, max_life)) - ) - - @property - def is_active(self) -> bool: - return self.time_left.total_seconds() > 0 - - @property - def add_funds(self) -> ContractTransactionHandler: - return cast( - ContractTransactionHandler, - partial(self.contract.add_funds, self.creator, self.stream_id), - ) - - @property - def is_cancelable(self) -> bool: - return self.contract.stream_is_cancelable(self.creator, self.stream_id) - - @property - def cancel(self) -> ContractTransactionHandler: - return cast( - ContractTransactionHandler, - partial(self.contract.cancel_stream, self.stream_id), - ) - - @property - def claim(self) -> ContractTransactionHandler: - if not self.amount_unlocked > 0: - raise FundsNotClaimable() - - return cast( - ContractTransactionHandler, - partial(self.contract.claim, self.creator, self.stream_id), - ) +from .manager import StreamManager +from .streams import Stream +from .validators import Validator + +# NOTE: This is required due to mutual recursion +Stream.model_rebuild() +Validator.model_rebuild() + +__all__ = [ + Stream.__name__, + StreamManager.__name__, + Validator.__name__, +] diff --git a/sdk/py/apepay/daemon.py b/sdk/py/apepay/daemon.py deleted file mode 100644 index 310ae5c4..00000000 --- a/sdk/py/apepay/daemon.py +++ /dev/null @@ -1,140 +0,0 @@ -import asyncio -import os -from datetime import timedelta -from enum import Enum - -import click -from ape.types import AddressType -from silverback import SilverbackApp - -from apepay import Stream, StreamManager - -from .settings import Settings - -settings = Settings() - - -class Status(Enum): - NORMAL = "normal" - WARNING = "warning" - CRITICAL = "critical" - INACTIVE = "inactive" - - @classmethod - def from_time_left(cls, time_left: timedelta) -> "Status": - if time_left > settings.WARNING_LEVEL: - return cls.NORMAL - - elif time_left > settings.CRITICAL_LEVEL: - return cls.WARNING - - elif time_left.total_seconds() > 0: - return cls.CRITICAL - - else: - return cls.INACTIVE - - -SM = StreamManager( - address=os.environ.get("APEPAY_CONTRACT_ADDRESS") - or click.prompt("What address to use?", type=AddressType) -) - -app = SilverbackApp() - - -async def create_task_by_status(stream: Stream): - time_left = stream.time_left - stream_status = Status.from_time_left(time_left) - - task = { - Status.NORMAL: stream_funding_normal_level, - Status.WARNING: stream_funding_warning_level, - Status.CRITICAL: stream_funding_critical_level, - Status.INACTIVE: stream_cancelled, - }[stream_status] - - if stream_status is Status.INACTIVE: - await task.kiq(stream.to_event()) - - else: - await task.kicker().with_labels(time_left=time_left).kiq(stream) - - return {"status": stream_status.value} - - -@app.on_startup() -async def app_started(state): - return await asyncio.gather( - # Start watching all active streams and claim any completed but unclaimed streams - *( - create_task_by_status(stream) - for stream in SM.all_streams() - if stream.is_active or stream.amount_unlocked > 0 - ) - ) - - -@app.on_(SM.contract.StreamCreated) -async def stream_created(event): - stream = Stream.from_event( - manager=SM, - event=event, - is_creation_event=True, - ) - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/normal") -async def stream_funding_normal_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.NORMAL: - # Wait until we're in warning range - await asyncio.sleep((stream.time_left - settings.WARNING_LEVEL).total_seconds()) - - # Check if stream has been cancelled - if Status.from_time_left(stream.time_left) is Status.WARNING: - # TODO: Trigger funding warning notification - print(f"Warning: only {stream.time_left} left") - - elif Status.from_time_left(stream.time_left) is Status.CRITICAL: - # TODO: Trigger funding critical notification - print(f"Critical: only {stream.time_left} left") - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/warning") -async def stream_funding_warning_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.WARNING: - # Wait for critical - await asyncio.sleep((stream.time_left - settings.CRITICAL_LEVEL).total_seconds()) - - # Check if stream has been cancelled - if Status.from_time_left(stream.time_left) is Status.CRITICAL: - # TODO: Trigger funding critical notification - print(f"Critical: only {stream.time_left} left") - - return await create_task_by_status(stream) - - -@app.broker.task(task_name="stream/critical") -async def stream_funding_critical_level(stream: Stream): - while Status.from_time_left(stream.time_left) is Status.CRITICAL: - # Wait until there's no time left - await asyncio.sleep(stream.time_left.total_seconds()) - - return await create_task_by_status(stream) - - -@app.on_(SM.contract.StreamCancelled) -async def stream_cancelled(event): - stream = Stream( - manager=SM, - creator=event.creator, - stream_id=event.stream_id, - ) - if app.signer and stream.amount_unlocked > 0: - stream.claim(sender=app.signer) - - return {"claimed": stream.amount_unlocked} diff --git a/sdk/py/apepay/manager.py b/sdk/py/apepay/manager.py new file mode 100644 index 00000000..9e080384 --- /dev/null +++ b/sdk/py/apepay/manager.py @@ -0,0 +1,299 @@ +import json +from collections.abc import Iterator +from datetime import datetime, timedelta +from functools import partial, wraps +from typing import TYPE_CHECKING, Any, Callable, Union, cast + +from ape.api import ReceiptAPI +from ape.contracts.base import ( + ContractCallHandler, + ContractEvent, + ContractInstance, + ContractTransactionHandler, +) +from ape.exceptions import ContractLogicError, DecodingError +from ape.types import AddressType, HexBytes +from ape.utils import BaseInterfaceModel, cached_property +from ape_ethereum import multicall +from pydantic import field_validator + +from .exceptions import StreamLifeInsufficient, TokenNotAccepted, ValidatorFailed +from .package import MANIFEST +from .streams import Stream +from .utils import time_unit_to_timedelta +from .validators import Validator + +if TYPE_CHECKING: + # NOTE: We really only use this for type checking, optional install + from silverback import SilverbackApp + +MAX_DURATION_SECONDS = int(timedelta.max.total_seconds()) - 1 + +_ValidatorItem = Union[Validator, ContractInstance, AddressType] + + +class StreamManager(BaseInterfaceModel): + address: AddressType + + def __init__(self, address, /, *args, **kwargs): + kwargs["address"] = address + super().__init__(*args, **kwargs) + + @field_validator("address", mode="before") + def normalize_address(cls, value: Any) -> AddressType: + return cls.conversion_manager.convert(value, AddressType) + + @property + def contract(self) -> ContractInstance: + return MANIFEST.StreamManager.at(self.address) + + def __repr__(self) -> str: + return f"" + + @property + def owner(self) -> AddressType: + return self.contract.owner() + + @property + def validators(self) -> list[Validator]: + call = multicall.Call() + [call.add(self.contract.validators, idx) for idx in range(20)] + try: + return [Validator(addr, manager=self) for addr in call() if addr is not None] + + except multicall.exceptions.UnsupportedChainError: + pass + + # Handle if multicall isn't available via brute force (e.g. local testing) + validators = [] + + for idx in range(20): # NOTE: Max of 20 validators per contract + try: + validator_address = self.contract.validators(idx) + + except (ContractLogicError, DecodingError): + # NOTE: Vyper returns no data if not a valid index + break + + validators.append(Validator(validator_address, manager=self)) + + return validators + + @property + def _parse_validator(self) -> Callable[[_ValidatorItem], Validator]: + return partial(Validator, manager=self) + + @property + def set_validators(self) -> ContractTransactionHandler: + + @wraps(self.contract.set_validators) + def order_validators(*validators: _ValidatorItem, **txn_kwargs) -> ReceiptAPI: + # NOTE: Always keep sets sorted, ensure no duplicates + return self.contract.set_validators( + sorted(v.address for v in set(map(self._parse_validator, validators))), + **txn_kwargs, + ) + + return cast(ContractTransactionHandler, order_validators) + + def add_validators(self, *new_validators: _ValidatorItem, **txn_kwargs) -> ReceiptAPI: + return self.set_validators( + *(set(self.validators) | set(map(self._parse_validator, new_validators))), + **txn_kwargs, + ) + + def remove_validators(self, *old_validators: _ValidatorItem, **txn_kwargs) -> ReceiptAPI: + return self.set_validators( + *(set(self.validators) - set(map(self._parse_validator, old_validators))), + **txn_kwargs, + ) + + @property + def add_token(self) -> ContractTransactionHandler: + return self.contract.add_token + + @property + def remove_token(self) -> ContractTransactionHandler: + return self.contract.remove_token + + @property + def is_accepted(self) -> ContractCallHandler: + return self.contract.token_is_accepted + + @cached_property + def MIN_STREAM_LIFE(self) -> timedelta: + # NOTE: Immutable in contract + return timedelta(seconds=self.contract.MIN_STREAM_LIFE()) + + def create( + self, + token: ContractInstance, + amount_per_second: str | int, + reason: HexBytes | bytes | str | dict | None = None, + start_time: datetime | int | None = None, + **txn_kwargs, + ) -> "Stream": + if not self.is_accepted(token): + raise TokenNotAccepted(str(token)) + + if isinstance(amount_per_second, str) and "/" in amount_per_second: + value, time = amount_per_second.split("/") + amount_per_second = int( + self.conversion_manager.convert(value.strip(), int) + / time_unit_to_timedelta(time).total_seconds() + ) + + if amount_per_second == 0: + raise ValueError("`amount_per_second` must be greater than 0.") + + args: list[Any] = [token, amount_per_second] + + if reason is not None: + if isinstance(reason, dict): + reason = json.dumps(reason, separators=(",", ":")) + + if isinstance(reason, str): + reason = reason.encode("utf-8") + + args.append(reason) + + if start_time is not None: + if len(args) == 2: + args.append(b"") # Add empty reason string + + if isinstance(start_time, datetime): + args.append(int(start_time.timestamp())) + + elif isinstance(start_time, int) and start_time < 0: + args.append(self.chain_manager.pending_timestamp + start_time) + + else: + args.append(start_time) + + if sender := hasattr(token, "allowance") and txn_kwargs.get("sender"): + allowance = token.allowance(sender, self.contract) + + if allowance == 2**256 - 1: # NOTE: Sentinel value meaning "all balance" + allowance = token.balanceOf(sender) + + stream_life = allowance // amount_per_second + + if stream_life < self.MIN_STREAM_LIFE.total_seconds(): + raise StreamLifeInsufficient( + stream_life=timedelta(seconds=stream_life), + min_stream_life=self.MIN_STREAM_LIFE, + ) + + validator_args = [sender, *args[:2]] + # Arg 3 (reason) is optional + if len(args) == 3: + validator_args.append(args[2]) + else: + validator_args.append(b"") + # Skip arg 4 (start_time) + + for v in self.validators: + if not v(*validator_args): + raise ValidatorFailed(v) + + tx = self.contract.create_stream(*args, **txn_kwargs) + + event = tx.events.filter(self.contract.StreamCreated)[-1] + return Stream.from_event( + manager=self, + event=event, + is_creation_event=True, + ) + + def _parse_stream_decorator(self, app: "SilverbackApp", container: ContractEvent): + + def decorator(f): + + @app.on_(container) + @wraps(f) + def inner(log): + return f(Stream(manager=self, creator=log.creator, stream_id=log.stream_id)) + + return inner + + return decorator + + def on_stream_created(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_created(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamCreated) + + def on_stream_funded(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_funded(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamFunded) + + def on_stream_claimed(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_claimed(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.Claimed) + + def on_stream_cancelled(self, app: "SilverbackApp"): + """ + Usage example:: + + app = SilverbackApp() + sm = StreamManager(address=...) + + sm.on_stream_cancelled(app) + def do_something(stream): + ... # Use `stream` to update your infrastructure + """ + return self._parse_stream_decorator(app, self.contract.StreamCancelled) + + def streams_by_creator(self, creator: AddressType) -> Iterator["Stream"]: + for stream_id in range(self.contract.num_streams(creator)): + yield Stream(manager=self, creator=creator, stream_id=stream_id) + + def all_streams(self, start_block: int | None = None) -> Iterator["Stream"]: + if start_block is None and self.contract.creation_metadata: + start_block = self.contract.creation_metadata.block + + for stream_created_event in self.contract.StreamCreated.range( + start_block or 0, + self.chain_manager.blocks.head.number, + ): + yield Stream.from_event( + manager=self, + event=stream_created_event, + is_creation_event=True, + ) + + def active_streams(self, start_block: int | None = None) -> Iterator["Stream"]: + for stream in self.all_streams(start_block=start_block): + if stream.is_active: + yield stream + + def unclaimed_streams(self, start_block: int | None = None) -> Iterator["Stream"]: + for stream in self.all_streams(start_block=start_block): + if not stream.is_active and stream.amount_unlocked > 0: + yield stream diff --git a/sdk/py/apepay/settings.py b/sdk/py/apepay/settings.py deleted file mode 100644 index 6974dd27..00000000 --- a/sdk/py/apepay/settings.py +++ /dev/null @@ -1,34 +0,0 @@ -from datetime import timedelta -from typing import Any - -from pydantic import BaseSettings, validator - -from apepay.utils import time_unit_to_timedelta - - -class Settings(BaseSettings): - WARNING_LEVEL: timedelta = timedelta(days=2) - CRITICAL_LEVEL: timedelta = timedelta(hours=12) - - @validator("WARNING_LEVEL", "CRITICAL_LEVEL", pre=True) - def _normalize_timedelta(cls, value: Any) -> timedelta: - if isinstance(value, timedelta): - return value - - elif isinstance(value, int): - return timedelta(seconds=value) - - elif not isinstance(value, str): - raise ValueError(f"Cannot convert: {value}") - - elif ":" in value and len(value.split(":")) == 3: - h, m, s = value.split(":") - return timedelta(hours=int(h), minutes=int(m), seconds=int(s)) - - else: - multiplier, time_unit = value.split(" ") - return int(multiplier) * time_unit_to_timedelta(time_unit) - - class Config: - env_prefix = "APEPAY_" - case_sensitive = True diff --git a/sdk/py/apepay/streams.py b/sdk/py/apepay/streams.py new file mode 100644 index 00000000..9e8459e1 --- /dev/null +++ b/sdk/py/apepay/streams.py @@ -0,0 +1,188 @@ +import json +from datetime import datetime, timedelta +from decimal import Decimal +from functools import partial +from typing import TYPE_CHECKING, Any, cast + +from ape.api import ReceiptAPI +from ape.contracts.base import ContractInstance, ContractTransactionHandler +from ape.types import AddressType, ContractLog, HexBytes +from ape.utils import BaseInterfaceModel, cached_property +from pydantic import field_validator + +from .exceptions import FundsNotClaimable, MissingCreationReceipt + +if TYPE_CHECKING: + from .manager import StreamManager + +MAX_DURATION_SECONDS = int(timedelta.max.total_seconds()) - 1 + + +class Stream(BaseInterfaceModel): + manager: "StreamManager" + creator: AddressType + stream_id: int + creation_receipt: ReceiptAPI | None = None + transaction_hash: HexBytes | None = None + + @field_validator("transaction_hash", mode="before") + def normalize_transaction_hash(cls, value: Any) -> HexBytes | None: + if value: + return HexBytes(cls.conversion_manager.convert(value, bytes)) + + return value + + @field_validator("creator", mode="before") + def validate_addresses(cls, value): + return ( + value if isinstance(value, str) else cls.conversion_manager.convert(value, AddressType) + ) + + @classmethod + def from_event( + cls, + manager: "StreamManager", + event: ContractLog, + is_creation_event: bool = False, + ) -> "Stream": + return cls( + manager=manager, + creator=event.creator, + stream_id=event.stream_id, + transaction_hash=event.transaction_hash if is_creation_event else None, + ) + + def to_event(self) -> ContractLog: + return self.receipt.events.filter(self.manager.contract.StreamCreated)[0] + + @property + def contract(self) -> ContractInstance: + return self.manager.contract + + @property + def receipt(self) -> ReceiptAPI: + if self.creation_receipt: + return self.creation_receipt + + if self.transaction_hash: + receipt = self.chain_manager.get_receipt(self.transaction_hash.hex()) + self.creation_receipt = receipt + return receipt + + raise MissingCreationReceipt() + + def __repr__(self) -> str: + return ( + f"" + ) + + @property + def info(self): + return self.contract.streams(self.creator, self.stream_id) + + @cached_property + def token(self) -> ContractInstance: + try: + from ape_tokens.managers import ERC20 # type: ignore[import-not-found] + except ImportError: + ERC20 = None + + return self.chain_manager.contracts.instance_at(self.info.token, contract_type=ERC20) + + @cached_property + def amount_per_second(self) -> int: + return self.info.amount_per_second + + @property + def funding_rate(self) -> Decimal: + """ + Funding rate, in tokens per second, of Stream in correct decimal form. + """ + return Decimal(self.amount_per_second) / Decimal(10 ** self.token.decimals()) + + def estimate_funding(self, period: timedelta) -> int: + """ + Useful for estimating how many tokens you need to add to extend for a specific time period. + """ + return int(period.total_seconds() * self.amount_per_second) + + @cached_property + def start_time(self) -> datetime: + return datetime.fromtimestamp(self.info.start_time) + + @cached_property + def reason(self) -> HexBytes | str | dict: + try: + reason_str = self.info.reason.decode("utf-8") + + except Exception: + return self.info.reason + + try: + return json.loads(reason_str) + + except (Exception, json.JSONDecodeError): + return reason_str + + @property + def last_pull(self) -> datetime: + return datetime.fromtimestamp(self.info.last_pull) + + @property + def amount_unlocked(self) -> int: + return self.contract.amount_unlocked(self.creator, self.stream_id) + + @property + def amount_locked(self) -> int: + return self.info.funded_amount - self.amount_unlocked + + @property + def time_left(self) -> timedelta: + seconds = self.contract.time_left(self.creator, self.stream_id) + return timedelta(seconds=min(MAX_DURATION_SECONDS, seconds)) + + @property + def total_time(self) -> timedelta: + info = self.info # NOTE: Avoid calling contract twice + # NOTE: Measure time-duration of unclaimed amount remaining (locked and unlocked) + max_life = int(info.funded_amount / info.amount_per_second) + + return ( + # NOTE: `last_pull == start_time` if never pulled + datetime.fromtimestamp(info.last_pull) + - datetime.fromtimestamp(info.start_time) + + timedelta(seconds=min(MAX_DURATION_SECONDS, max_life)) + ) + + @property + def is_active(self) -> bool: + return self.time_left.total_seconds() > 0 + + @property + def add_funds(self) -> ContractTransactionHandler: + return cast( + ContractTransactionHandler, + partial(self.contract.add_funds, self.creator, self.stream_id), + ) + + @property + def is_cancelable(self) -> bool: + return self.contract.stream_is_cancelable(self.creator, self.stream_id) + + @property + def cancel(self) -> ContractTransactionHandler: + return cast( + ContractTransactionHandler, + partial(self.contract.cancel_stream, self.stream_id), + ) + + @property + def claim(self) -> ContractTransactionHandler: + if not self.amount_unlocked > 0: + raise FundsNotClaimable() + + return cast( + ContractTransactionHandler, + partial(self.contract.claim, self.creator, self.stream_id), + ) diff --git a/sdk/py/apepay/utils.py b/sdk/py/apepay/utils.py index b3e9fa80..3d1cbe27 100644 --- a/sdk/py/apepay/utils.py +++ b/sdk/py/apepay/utils.py @@ -7,7 +7,7 @@ def async_wrap_iter(it: Iterator) -> AsyncIterator: """Wrap blocking iterator into an asynchronous one""" loop = asyncio.get_event_loop() - q = asyncio.Queue(1) + q = asyncio.Queue(1) # type: ignore[var-annotated] exception = None _END = object() diff --git a/sdk/py/apepay/validators.py b/sdk/py/apepay/validators.py new file mode 100644 index 00000000..f2921de8 --- /dev/null +++ b/sdk/py/apepay/validators.py @@ -0,0 +1,71 @@ +from typing import TYPE_CHECKING, Any + +from ape.contracts.base import ContractInstance +from ape.exceptions import ContractLogicError +from ape.types import AddressType +from ape.utils import BaseInterfaceModel +from eth_utils import to_int +from pydantic import field_validator + +from .package import MANIFEST + +if TYPE_CHECKING: + from .manager import StreamManager + + +class Validator(BaseInterfaceModel): + """ + Wrapper class around a Validator contract that is connected with a specific + `stream_manager` on chain. + """ + + address: AddressType + manager: "StreamManager" + + def __init__(self, address: str | AddressType, /, *args, **kwargs): + kwargs["address"] = address + super().__init__(*args, **kwargs) + + @field_validator("address", mode="before") + def normalize_address(cls, value: Any) -> AddressType: + if isinstance(value, Validator): + return value.address + + return cls.conversion_manager.convert(value, AddressType) + + @property + def contract(self) -> ContractInstance: + return self.chain_manager.contracts.instance_at( + self.address, + contract_type=MANIFEST.Validator.contract_type, + ) + + def __hash__(self) -> int: + # NOTE: So `set` works + return self.address.__hash__() + + def __gt__(self, other: Any) -> bool: + # NOTE: So `sorted` works + if isinstance(other, (Validator, ContractInstance)): + return to_int(hexstr=self.address.lower()) > to_int(hexstr=other.address.lower()) + + return NotImplemented + + def __eq__(self, other: Any) -> bool: + if isinstance(other, (Validator, ContractInstance)): + return self.address == other.address + + # Try __eq__ from the other side. + return NotImplemented + + def __call__(self, *args, **kwargs) -> bool: + try: + # NOTE: Imitate that the call is coming from the specified StreamManager. + # Also note that a validator can be connected to >1 StreamManagers. + self.contract._mutable_methods_["validate"].call( + *args, sender=self.manager.address, **kwargs + ) + return True + + except ContractLogicError: + return False diff --git a/tests/conftest.py b/tests/conftest.py index 68b36992..701568e0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -65,7 +65,7 @@ def stream_manager_contract(owner, project, MIN_STREAM_LIFE, validators, tokens) @pytest.fixture(scope="session") def stream_manager(stream_manager_contract): - return StreamManager(address=stream_manager_contract) + return StreamManager(stream_manager_contract) @pytest.fixture(scope="session")