-
Notifications
You must be signed in to change notification settings - Fork 1
/
contract_deploy.py
191 lines (174 loc) · 7.17 KB
/
contract_deploy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import json
from typing import Optional
from argparse import ArgumentParser
from cyberutils.bash import execute_bash
from config import logging, CHAIN_IDS, NODE_RPC_URLS, WALLET_ADDRESSES, CONTRACT_NAMES, CODE_IDS
def build_code(
contract_code_path: str = './cw-on-chain-registry/contracts/on-chain-registry'
) -> tuple[Optional[str], Optional[str]]:
"""
Build code by cw-optimizoor
:param contract_code_path: source code path
:return: none
"""
return execute_bash(
bash_command=f"cd {contract_code_path} && cargo cw-optimizoor",
shell=True)
def store_code(
wallet_address: str,
chain_id: str,
note: str,
node_rpc_url: str,
cli_name: str = 'cyber',
gas: int = 10_000_000) -> Optional[str]:
"""
Store a code to a chain by a cli
:param wallet_address: sender address
:param chain_id: chain id
:param note: transaction note
:param node_rpc_url: node rpc url
:param cli_name: cli name for bash query
:param gas: gas limit
:return: code id
"""
print(f"{cli_name} tx wasm store ./cw-on-chain-registry/artifacts/on_chain_registry-aarch64.wasm "
f"--from={wallet_address} --chain-id={chain_id} --broadcast-mode=block --note='{note}' "
f"--gas={gas} -y -o=json --node={node_rpc_url}")
_res, _ = execute_bash(
bash_command=f"{cli_name} tx wasm store ./cw-on-chain-registry/artifacts/on_chain_registry-aarch64.wasm "
f"--from={wallet_address} --chain-id={chain_id} --broadcast-mode=block --note='{note}' "
f"{'--fees=30000uosmo ' if chain_id[:4] == 'osmo' else ''}"
f"--gas={gas} -y -o=json --node={node_rpc_url}",
shell=True)
try:
_attributes = json.loads(_res)['logs'][0]['events'][-1]['attributes']
_code_id = [item['value'] for item in _attributes if item['key'] == 'code_id'][0]
return _code_id
except IndexError:
logging.error(_res)
def instantiate_contract(
init_query: str,
code_id: str,
contract_label: str,
from_address: str,
chain_id: str,
node_rpc_url: str,
gas: int,
cli_name: str = 'cyber',
contract_admin: Optional[str] = None,
amount: str = '',
display_data: bool = False) -> str:
"""
Instantiate a contract by a cli
:param init_query: Instantiate query
:param code_id: code id
:param contract_label: contract label
:param from_address: sender address
:param chain_id: chain id
:param node_rpc_url: node rpc url
:param cli_name: cli name
:param gas: gas limit
:param contract_admin: address of a contract admin
:param amount: coins to send to a contract during instantiation
:param display_data: display transaction data or not
:return: contract address
"""
_init_output, _init_error = execute_bash(
f'''INIT='{init_query}' \
&& {cli_name} tx wasm instantiate {code_id} "$INIT" --from={from_address} \
{'--amount=' + amount if amount else ''} --label="{contract_label}" \
{'--admin=' + contract_admin if contract_admin else '--no-admin'} \
{'--fees=10000uosmo ' if chain_id[:4] == 'osmo' else ''} \
-y --gas={gas} --broadcast-mode=block -o=json --chain-id={chain_id} --node={node_rpc_url}''',
shell=True)
if display_data:
try:
logging.info(json.dumps(json.loads(_init_output), indent=4, sort_keys=True))
except json.JSONDecodeError:
logging.error(_init_output)
if _init_error:
logging.error(_init_error)
_init_json = json.loads(_init_output)
return [event['attributes'][0]['value']
for event in _init_json['logs'][0]['events']
if event['type'] == 'instantiate'][0]
def init_on_chain_registry_contract(
executors_addresses: list[str],
admins_addresses: list[str],
wallet_address: str,
code_id: str,
contract_label: str,
chain_id: str,
node_rpc_url: str,
gas: int = 4_000_000,
cli_name: str = 'cyber',
contract_admin: Optional[str] = None,
display_data: bool = True) -> str:
"""
Instantiate an `on-chain registry` contract by a cli
:param executors_addresses: addresses of contract executors is specific to `on-chain registry` contract
:param admins_addresses: addresses of contract admins is specific to `on-chain registry` contract
:param wallet_address: address of a transaction sender
:param code_id: code id
:param contract_label: contract label
:param chain_id: chain id
:param node_rpc_url: node rpc url
:param gas: gas limit
:param cli_name: cli name
:param contract_admin: address of a contract admin
:param display_data: display transaction data or not
:return: contract address
"""
_init_query = \
f'''{{"executors":["{'", "'.join(executors_addresses)}"], "admins":["{'", "'.join(admins_addresses)}"]}}'''
_contract_address = \
instantiate_contract(
init_query=_init_query,
code_id=code_id,
contract_label=contract_label,
from_address=wallet_address,
chain_id=chain_id,
node_rpc_url=node_rpc_url,
gas=gas,
cli_name=cli_name,
contract_admin=contract_admin,
display_data=display_data)
return _contract_address
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("--chain_name", default='bostrom')
parser.add_argument("--build_code", action='store_true')
parser.add_argument("--store_code", action='store_true')
parser.add_argument("--init_contract", action='store_true')
args = parser.parse_args()
chain_name = args.chain_name if args.chain_name else 'bostrom'
assert chain_name in CHAIN_IDS.keys()
if args.build_code:
build_code()
logging.info(f'the code has been built')
if args.store_code:
code_id = store_code(
wallet_address=WALLET_ADDRESSES[chain_name],
note=CONTRACT_NAMES[chain_name],
chain_id=CHAIN_IDS[chain_name],
node_rpc_url=NODE_RPC_URLS[chain_name],
cli_name='osmosisd' if chain_name[:7] == 'osmosis' else 'cyber'
)
if code_id:
logging.info(f'the code has been stored in {chain_name}, code id {code_id}.\nplease update `config.py`')
else:
code_id = CODE_IDS[chain_name]
if args.init_contract:
contract_address = init_on_chain_registry_contract(
executors_addresses=[WALLET_ADDRESSES[chain_name]],
admins_addresses=[WALLET_ADDRESSES[chain_name]],
wallet_address=WALLET_ADDRESSES[chain_name],
code_id=code_id,
contract_label=CONTRACT_NAMES[chain_name],
chain_id=CHAIN_IDS[chain_name],
node_rpc_url=NODE_RPC_URLS[chain_name],
cli_name='osmosisd' if chain_name[:7] == 'osmosis' else 'cyber'
)
if contract_address:
logging.info(f'the contract has been instantiated in {chain_name} network, '
f'contract address {contract_address}.\nplease update `config.py`')