Skip to content

Commit

Permalink
refactor: larger refactor of stream manager
Browse files Browse the repository at this point in the history
- `Stream.stream_id -> Stream.id`
- reordering/renaming of some event arguments
- reordering of `create_stream` function
- remove unused functions
  • Loading branch information
fubuloubu committed Sep 25, 2024
1 parent 335eaa8 commit 955b367
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 255 deletions.
18 changes: 6 additions & 12 deletions bots/example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
from collections import defaultdict

from ape.types import AddressType
from silverback import SilverbackApp

from apepay import Stream, StreamManager
Expand All @@ -12,37 +10,33 @@

# NOTE: You would probably want to index your db by network and deployment address,
# if you were operating on multiple networks and/or deployments (for easy lookup)
db: defaultdict[AddressType, list[Stream]] = defaultdict(list)
db: dict[int, Stream] = dict()
# TODO: Migrate to `app.state.db` when feature becomes available


@app.on_startup()
async def load_db(_):
for stream in sm.active_streams():
while len(db[stream.creator]) < stream.stream_id:
db[stream.creator].append(None) # Fill with empty values
assert stream.stream_id == len(db[stream.creator])
db[stream.creator].append(stream)
db[stream.id] = stream


@sm.on_stream_created(app)
async def grant_product(stream):
assert stream.stream_id == len(db[stream.creator])
db[stream.creator].append(stream)
print(f"provisioning product for {stream.creator}")
db[stream.id] = stream
print(f"provisioning products: {stream.products}")
return stream.time_left


@sm.on_stream_funded(app)
async def update_product_funding(stream):
# NOTE: properties of stream have changed, you may not need to handle this, but typically you
# would want to update `stream.time_left` in db for use in user Stream life notifications
db[stream.creator][stream.stream_id] = stream
db[stream.id] = stream
return stream.time_left


@sm.on_stream_cancelled(app)
async def revoke_product(stream):
print(f"unprovisioning product for {stream.creator}")
db[stream.creator][stream.stream_id] = None
db[stream.id] = None
return stream.time_left
79 changes: 19 additions & 60 deletions contracts/StreamManager.vy
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ capabilities: public(HashMap[address, Ability])


event StreamCreated:
stream_id: indexed(uint256)
id: indexed(uint256)
owner: indexed(address)
token: indexed(IERC20)
amount_per_second: uint256
Expand All @@ -82,29 +82,29 @@ event StreamCreated:


event StreamOwnershipUpdated:
stream_id: indexed(uint256)
id: indexed(uint256)
old: indexed(address)
new: indexed(address)


event StreamFunded:
stream_id: indexed(uint256)
id: indexed(uint256)
funder: indexed(address)
amount_added: uint256
added: uint256


event StreamClaimed:
stream_id: indexed(uint256)
id: indexed(uint256)
claimer: indexed(address)
stream_exhausted: indexed(bool)
claimed_amount: uint256
exhausted: indexed(bool)
claimed: uint256


event StreamCancelled:
stream_id: indexed(uint256)
cancellor: indexed(address)
id: indexed(uint256)
canceller: indexed(address)
reason: indexed(bytes32)
amount_refunded: uint256
refunded: uint256


@deploy
Expand Down Expand Up @@ -171,8 +171,8 @@ def create_stream(
token: IERC20,
amount_per_second: uint256,
products: DynArray[bytes32, MAX_PRODUCTS] = [],
start_time: uint256 = block.timestamp,
max_funding: uint256 = max_value(uint256),
start_time: uint256 = block.timestamp,
) -> uint256:
assert self.token_is_accepted[token] # dev: token not accepted
assert start_time <= block.timestamp # dev: start time in future
Expand All @@ -197,7 +197,9 @@ def create_stream(
# Ensure stream life parameters are acceptable
assert max_stream_life >= funded_amount // amount_per_second # dev: max stream life too small

prefunded_stream_life: uint256 = max(MIN_STREAM_LIFE, block.timestamp - start_time)
prefunded_stream_life: uint256 = max(
MIN_STREAM_LIFE, block.timestamp - start_time # dev: start_time in future
)
assert max_stream_life >= prefunded_stream_life # dev: prefunded stream life too large
assert funded_amount >= prefunded_stream_life * amount_per_second # dev: not enough funds

Expand Down Expand Up @@ -284,15 +286,15 @@ def fund_stream(stream_id: uint256, amount: uint256) -> uint256:


@view
def _stream_is_cancelable(creator: address, stream_id: uint256) -> bool:
# Creator needs to wait `MIN_STREAM_LIFE` to cancel a stream
return self.streams[creator][stream_id].start_time + MIN_STREAM_LIFE <= block.timestamp
def _stream_is_cancelable(stream_id: uint256) -> bool:
# Stream owner needs to wait `MIN_STREAM_LIFE` to cancel a stream
return block.timestamp - self.streams[stream_id].start_time >= MIN_STREAM_LIFE


@view
@external
def stream_is_cancelable(creator: address, stream_id: uint256) -> bool:
return self._stream_is_cancelable(creator, stream_id)
def stream_is_cancelable(stream_id: uint256) -> bool:
return self._stream_is_cancelable(stream_id)


@external
Expand All @@ -306,54 +308,11 @@ def claim_stream(stream_id: uint256) -> uint256:
token: IERC20 = self.streams[stream_id].token
assert extcall token.transfer(self.controller, claim_amount, default_return_value=True)


@external
def cancel_stream(
stream_id: uint256,
reason: Bytes[MAX_REASON_SIZE] = b"",
creator: address = msg.sender,
) -> uint256:
if msg.sender == creator:
assert self._stream_is_cancelable(creator, stream_id)

else:
# Owner can cancel at any time
assert msg.sender == self.owner

funded_amount: uint256 = self.streams[creator][stream_id].funded_amount
amount_locked: uint256 = funded_amount - self._amount_unlocked(creator, stream_id)
assert amount_locked > 0 # NOTE: reverts if stream doesn't exist, or already cancelled
self.streams[creator][stream_id].funded_amount = funded_amount - amount_locked

token: IERC20 = self.streams[creator][stream_id].token
assert extcall token.transfer(creator, amount_locked, default_return_value=True)

log StreamCancelled(creator, stream_id, amount_locked, reason)

return funded_amount - amount_locked


@external
def claim(creator: address, stream_id: uint256) -> uint256:
funded_amount: uint256 = self.streams[creator][stream_id].funded_amount
claim_amount: uint256 = self._amount_unlocked(creator, stream_id)
self.streams[creator][stream_id].funded_amount = funded_amount - claim_amount
self.streams[creator][stream_id].last_pull = block.timestamp

token: IERC20 = self.streams[creator][stream_id].token
assert extcall token.transfer(self.owner, claim_amount, default_return_value=True)

log StreamClaimed(stream_id, msg.sender, funded_amount == claim_amount, claim_amount)

return claim_amount


@view
@external
def stream_is_cancelable(stream_id: uint256) -> bool:
return block.timestamp - self.streams[stream_id].start_time >= MIN_STREAM_LIFE


@external
def cancel_stream(stream_id: uint256, reason: bytes32 = empty(bytes32)) -> uint256:
stream_owner: address = self.streams[stream_id].owner
Expand Down
19 changes: 12 additions & 7 deletions scripts/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def cli(
token = cli_ctx.local_project.TestToken.deploy(sender=deployer)
sm = StreamManager(
cli_ctx.local_project.StreamManager.deploy(
deployer, min_stream_life, [], [token], sender=deployer
deployer, min_stream_life, [token], [], sender=deployer
)
)

Expand Down Expand Up @@ -64,24 +64,24 @@ def cli(

# Do a little garbage collection
for stream in streams[payer.address]:
click.echo(f"{payer}:{stream.stream_id} - {stream.time_left}")
click.echo(f"Stream '{stream.id}' - {stream.time_left}")
if not stream.is_active:
click.echo(f"Stream '{payer}:{stream.stream_id}' is expired, removing...")
click.echo(f"Stream '{stream.id}' is expired, removing...")
streams[payer.address].remove(stream)

if len(streams[payer.address]) > 0:
stream = random.choice(streams[payer.address])

if token.balanceOf(payer) >= 10 ** (decimals + 1) and random.random() < fund_stream:
click.echo(
f"Stream '{payer}:{stream.stream_id}' is being funded "
f"Stream '{stream.id}' is being funded "
f"w/ {funding_amount / 10**decimals:.2f} tokens..."
)
token.approve(sm.address, funding_amount, sender=payer)
stream.add_funds(funding_amount, sender=payer)

elif random.random() < cancel_stream:
click.echo(f"Stream '{payer}:{stream.stream_id}' is being cancelled...")
click.echo(f"Stream '{stream.id}' is being cancelled...")
stream.cancel(sender=payer)
streams[payer.address].remove(stream)

Expand All @@ -91,6 +91,11 @@ def cli(
elif len(streams[payer.address]) < max_streams and random.random() < create_stream:
click.echo(f"'{payer}' is creating a new stream...")
token.approve(sm.address, starting_tokens, sender=payer)
stream = sm.create(token, int(starting_tokens / starting_life), sender=payer)
stream = sm.create(
token,
int(starting_tokens / starting_life),
max_funding=starting_tokens,
sender=payer,
)
streams[payer.address].append(stream)
click.echo(f"Stream '{payer}:{stream.stream_id}' was created successfully.")
click.echo(f"Stream '{stream.id}' was created successfully.")
26 changes: 10 additions & 16 deletions scripts/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,25 @@ def cli():

@cli.command(cls=ConnectedProviderCommand)
@network_option()
@click.option("--start-block", type=int)
@click.argument("address", type=AddressType)
def unclaimed(network, start_block, address):
@click.argument("manager", type=StreamManager)
def unclaimed(manager):
"""List all unclaimed streams"""

sm = StreamManager(address=address)
for stream in sm.unclaimed_streams(start_block=start_block):
click.echo(
f"{stream.creator}/{stream.stream_id}: "
f"{stream.amount_unlocked / 10 ** stream.token.decimals()} "
f"{stream.token.symbol()}"
)
for stream in manager.unclaimed_streams():
stream_balance = stream.amount_unlocked / 10 ** stream.token.decimals()
click.echo(f"{stream.id}: {stream_balance} {stream.token.symbol()}")


@cli.command(cls=ConnectedProviderCommand)
@network_option()
@account_option()
@click.option("--start-block", type=int)
@click.option("--batch-size", type=int, default=256)
@click.option("--multicall/--no-multicall", "use_multicall", default=True)
@click.argument("address", type=AddressType)
def claim(account, start_block, batch_size, use_multicall, address):
@click.argument("manager", type=StreamManager)
def claim(account, batch_size, use_multicall, manager):
"""Claim unclaimed streams using multicall (anyone can claim)"""

sm = StreamManager(address=address)
unclaimed_streams = sm.unclaimed_streams(start_block=start_block)
unclaimed_streams = manager.unclaimed_streams()

if not use_multicall:
for _ in range(batch_size):
Expand All @@ -55,6 +48,7 @@ def claim(account, start_block, batch_size, use_multicall, address):
click.echo(f"INFO: {len(list(unclaimed_streams))} more claims needed...")
return

# else: use multicall
more_streams = True

while more_streams:
Expand All @@ -67,7 +61,7 @@ def claim(account, start_block, batch_size, use_multicall, address):
more_streams = False
break

tx.add(sm.contract.claim, stream.creator, stream.stream_id)
tx.add(manager.contract.claim_stream, stream.id)

try:
tx(sender=account)
Expand Down
Loading

0 comments on commit 955b367

Please sign in to comment.