Skip to content

Commit

Permalink
Seperate stream data into chunks (#100)
Browse files Browse the repository at this point in the history
* Stream deconstruct WIP

* Minimize stream storage load

* Fix comment
  • Loading branch information
orkunkl authored Dec 3, 2024
1 parent 4da540d commit 2225efd
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 386 deletions.
66 changes: 47 additions & 19 deletions contracts/stream/src/circuit_ops.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::helpers::build_u128_bank_send_msg;
use crate::pool::pool_refund;
use crate::state::{CONTROLLER_PARAMS, POSITIONS, STREAM};
use crate::state::{CONTROLLER_PARAMS, POSITIONS, POST_STREAM, STREAM_INFO, STREAM_STATE};
use crate::stream::{sync_stream, sync_stream_status};
use crate::ContractError;
use cosmwasm_std::{attr, BankMsg, CosmosMsg, DepsMut, Env, MessageInfo, Response, Timestamp};
Expand All @@ -13,7 +13,7 @@ pub fn execute_exit_cancelled(
env: Env,
info: MessageInfo,
) -> Result<Response, ContractError> {
let mut stream = STREAM.load(deps.storage)?;
let mut stream = STREAM_STATE.load(deps.storage)?;

let mut position = POSITIONS.load(deps.storage, &info.sender)?;
if position.owner != info.sender {
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn execute_cancel_stream(
if controller_params.protocol_admin != info.sender {
return Err(ContractError::Unauthorized {});
}
let mut stream = STREAM.load(deps.storage)?;
let mut stream = STREAM_STATE.load(deps.storage)?;
sync_stream_status(&mut stream, env.block.time);

if stream.is_finalized() || stream.is_cancelled() {
Expand All @@ -93,19 +93,28 @@ pub fn execute_cancel_stream(
stream.status_info.status = Status::Cancelled;

sync_stream(&mut stream, env.block.time);
STREAM.save(deps.storage, &stream)?;
STREAM_STATE.save(deps.storage, &stream)?;

// Refund all out tokens to stream creator(treasury)
let mut refund_coins = vec![stream.out_asset.clone()];
let pool_refund_coins = pool_refund(&deps, stream.pool_config, stream.out_asset.denom.clone())?;

refund_coins.extend(pool_refund_coins);
// refund pool creation if any
let post_stream_ops = POST_STREAM.may_load(deps.storage)?;
if let Some(post_stream_ops) = post_stream_ops {
let pool_refund_coins = pool_refund(
&deps,
post_stream_ops.pool_config,
stream.out_asset.denom.clone(),
)?;
refund_coins.extend(pool_refund_coins);
}

let stream_info = STREAM_INFO.load(deps.storage)?;
let funds_msgs: Vec<CosmosMsg> = refund_coins
.iter()
.map(|coin| {
CosmosMsg::Bank(BankMsg::Send {
to_address: stream.treasury.to_string(),
to_address: stream_info.treasury.to_string(),
amount: vec![coin.clone()],
})
})
Expand All @@ -122,9 +131,10 @@ pub fn execute_cancel_stream_with_threshold(
env: Env,
info: MessageInfo,
) -> Result<Response, ContractError> {
let mut stream = STREAM.load(deps.storage)?;
let mut stream = STREAM_STATE.load(deps.storage)?;
let stream_info = STREAM_INFO.load(deps.storage)?;
// Only stream creator can cancel the stream with threshold not reached
if info.sender != stream.stream_admin {
if info.sender != stream_info.stream_admin {
return Err(ContractError::Unauthorized {});
}
sync_stream_status(&mut stream, env.block.time);
Expand All @@ -150,19 +160,27 @@ pub fn execute_cancel_stream_with_threshold(

stream.status_info.status = Status::Cancelled;

STREAM.save(deps.storage, &stream)?;
STREAM_STATE.save(deps.storage, &stream)?;

// Refund all out tokens to stream creator(treasury)
let mut refund_coins = vec![stream.out_asset.clone()];
let pool_refund_coins = pool_refund(&deps, stream.pool_config, stream.out_asset.denom.clone())?;

refund_coins.extend(pool_refund_coins);
// refund pool creation if any
let post_stream_ops = POST_STREAM.may_load(deps.storage)?;
if let Some(post_stream_ops) = post_stream_ops {
let pool_refund_coins = pool_refund(
&deps,
post_stream_ops.pool_config,
stream.out_asset.denom.clone(),
)?;
refund_coins.extend(pool_refund_coins);
}

let funds_msgs: Vec<CosmosMsg> = refund_coins
.iter()
.map(|coin| {
CosmosMsg::Bank(BankMsg::Send {
to_address: stream.treasury.to_string(),
to_address: stream_info.treasury.to_string(),
amount: vec![coin.clone()],
})
})
Expand All @@ -178,9 +196,10 @@ pub fn execute_stream_admin_cancel(
env: Env,
info: MessageInfo,
) -> Result<Response, ContractError> {
let mut stream = STREAM.load(deps.storage)?;
let mut stream = STREAM_STATE.load(deps.storage)?;
let stream_info = STREAM_INFO.load(deps.storage)?;
// Only stream admin can cancel the stream with this method
if info.sender != stream.stream_admin {
if info.sender != stream_info.stream_admin {
return Err(ContractError::Unauthorized {});
}

Expand All @@ -194,18 +213,27 @@ pub fn execute_stream_admin_cancel(
}
stream.status_info.status = Status::Cancelled;
sync_stream(&mut stream, env.block.time);
STREAM.save(deps.storage, &stream)?;
STREAM_STATE.save(deps.storage, &stream)?;

// Refund all out tokens to stream creator(treasury)
let mut refund_coins = vec![stream.out_asset.clone()];
let pool_refund_coins = pool_refund(&deps, stream.pool_config, stream.out_asset.denom.clone())?;

refund_coins.extend(pool_refund_coins);
// refund pool creation if any
let post_stream_ops = POST_STREAM.may_load(deps.storage)?;
if let Some(post_stream_ops) = post_stream_ops {
let pool_refund_coins = pool_refund(
&deps,
post_stream_ops.pool_config,
stream.out_asset.denom.clone(),
)?;
refund_coins.extend(pool_refund_coins);
}

let funds_msgs: Vec<CosmosMsg> = refund_coins
.iter()
.map(|coin| {
CosmosMsg::Bank(BankMsg::Send {
to_address: stream.treasury.to_string(),
to_address: stream_info.treasury.to_string(),
amount: vec![coin.clone()],
})
})
Expand Down
Loading

0 comments on commit 2225efd

Please sign in to comment.