Skip to content

Commit

Permalink
Merge pull request #15 from Snedashkovsky/multiprocess_extractor
Browse files Browse the repository at this point in the history
Multiprocess extraction
  • Loading branch information
Snedashkovsky authored Jul 28, 2023
2 parents 582ec9d + 4658279 commit e05c756
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 58 deletions.
53 changes: 34 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
extract:
default: help

all : help extract export run_notebook update build_code store_code init_contract deploy_contract
.PHONY : all

help: # show help for each of the makefile recipes
@grep -E '^[a-zA-Z0-9 -_]+:.*#' Makefile | while read -r l; do printf "\033[1;32m$$(echo $$l | cut -f 1 -d':')\033[00m:$$(echo $$l | cut -f 2- -d'#')\n"; done

extract: # extract metadata from node apis
@echo "pull chain-registry"
git submodule init
git submodule update --remote
@echo "extract data"
python3 asset_data.py --extract=True --export=False
export:
@echo "export data"
python3 asset_data.py --extract=False --export=True
run_notebook:
python3 asset_data.py --extract --export

export: # export metadata to jsons, csv and contracts
@echo "export metadata to jsons, csv and contracts"
python3 asset_data.py --export

run_notebook: # run asset_data.ipynb notebook
jupyter nbconvert --to=notebook --inplace --execute asset_data.ipynb
update: extract export run_notebook
build_code:
@echo "build code"
python3 contract_deploy.py --build_code=True
store_code:
@echo "store code"
python3 contract_deploy.py --chain_name=$(chain_name) --store_code=True
init_contract:
@echo "instantiate contract"
python3 contract_deploy.py --chain_name=$(chain_name) --init_contract=True
deploy_contract:
@echo "build a code, store a code and instantiate a contract"
python3 contract_deploy.py --chain_name=$(chain_name) --build_code=True --store_code=True --init_contract=True

update: extract export run_notebook # extract from node apis and export metadata, run asset_data.ipynb notebook

build_code: # build cw-on-chain-registry code
@echo "build cw-on-chain-registry code"
python3 contract_deploy.py --build_code

store_code: # store cw-on-chain-registry code to a chain
@echo "store cw-on-chain-registry code"
python3 contract_deploy.py --chain_name=$(chain_name) --store_code

init_contract: # instantiate a contract in a chain
@echo "instantiate a contract"
python3 contract_deploy.py --chain_name=$(chain_name) --init_contract

deploy_contract: # build and store cw-on-chain-registry code, instantiate a contract from it
@echo "build and store cw-on-chain-registry code, instantiate a contract from it"
python3 contract_deploy.py --chain_name=$(chain_name) --build_code --store_code --init_contract
39 changes: 22 additions & 17 deletions asset_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import ast
from argparse import ArgumentParser
from multiprocessing import Pool

import pandas as pd
from warnings import filterwarnings
Expand Down Expand Up @@ -180,7 +181,11 @@ def save_to_json(
fp=all_assets_file, ensure_ascii=False, indent=4)


def run_extract() -> None:
def extract_assets_star(args):
return extract_assets(*args)


def run_extract(number_of_treads: int = 10) -> None:
"""
Extract asset metadata and store it to intermediate csv files
:return: none
Expand All @@ -190,14 +195,15 @@ def run_extract() -> None:
logging.info(msg=f'start extraction {len(_chain_id_name_dict.keys()):>,} chains')

# extract asset data from lcd apis
for _chain_id, _node_lcd_url_list in list(_chain_id_lcd_dict.items()):
logging.info(_chain_id)
_asset_df = extract_assets(_chain_id, _node_lcd_url_list)
if _asset_df is None:
logging.info(f'data has not been loaded for {_chain_id}, lcd apis not work')
continue
_asset_df.to_csv(f'data_csv/assets_{_chain_id}.csv')
logging.info(msg=f'extract {len(_asset_df):>,} assets for `{_chain_id}` chain_id')
_tasks = [[_chain_id, _node_lcd_url_list] for _chain_id, _node_lcd_url_list in _chain_id_lcd_dict.items()]
logging.info(f'lcd extract. first task: {_tasks[0][0]} last task: {_tasks[-1][0]} total tasks: {len(_tasks)} '
f'threads: {number_of_treads:>,}')
with Pool(processes=number_of_treads) as pool:
_res = list(tqdm(pool.imap(extract_assets_star, _tasks, 1), total=len(_tasks)))
logging.info(
f'! extracted chains {sum(_res)} not extracted {len(_tasks) - sum(_res)} total {len(_tasks)}.'
f'not extracted: {", ".join([_item[0] for _i, _item in enumerate(_tasks) if _res[_i] == False])}'
)


def run_export() -> None:
Expand All @@ -223,20 +229,19 @@ def run_export() -> None:
wallet_address=WALLET_ADDRESSES[_chain_name],
fee_denom=FEE_DENOMS[_chain_name],
)
logging.info(msg=f'exported {len(_assets_df):>,} assets for {len(set(_assets_df.chain_id.to_list()))} chains')
logging.info(msg=f'! exported {len(_assets_df):>,} assets for {len(set(_assets_df.chain_id.to_list()))} chains')


if __name__ == '__main__':

parser = ArgumentParser()
parser.add_argument("--extract", default=True)
parser.add_argument("--export", default=True)
parser.add_argument("--extract", action='store_true')
parser.add_argument("--export", action='store_true')
args = parser.parse_args()

extract_bool = bool(args.extract)
export_bool = bool(args.export)

if extract_bool:
if args.extract:
logging.info('! start extraction')
run_extract()
if export_bool:
if args.export:
logging.info('! start export')
run_export()
16 changes: 6 additions & 10 deletions contract_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,19 @@ def init_on_chain_registry_contract(

parser = ArgumentParser()
parser.add_argument("--chain_name", default='bostrom')
parser.add_argument("--build_code", default=False)
parser.add_argument("--store_code", default=False)
parser.add_argument("--init_contract", default=False)
parser.add_argument("--build_code", action='store_true')
parser.add_argument("--store_code", action='store_true')
parser.add_argument("--init_contract", action='store_true')
args = parser.parse_args()

chain_name = args.chain_name if args.chain_name else 'bostrom'
build_code_bool = bool(args.build_code)
store_code_bool = bool(args.store_code)
init_contract_bool = bool(args.init_contract)

assert chain_name in CHAIN_IDS.keys()

if build_code_bool:
if args.build_code:
build_code()
logging.info(f'the code has been built')

if store_code_bool:
if args.store_code:
code_id = store_code(
wallet_address=WALLET_ADDRESSES[chain_name],
note=CONTRACT_NAMES[chain_name],
Expand All @@ -175,7 +171,7 @@ def init_on_chain_registry_contract(
else:
code_id = CODE_IDS[chain_name]

if init_contract_bool:
if args.init_contract:
contract_address = init_on_chain_registry_contract(
executors_addresses=[WALLET_ADDRESSES[chain_name]],
admins_addresses=[WALLET_ADDRESSES[chain_name]],
Expand Down
28 changes: 16 additions & 12 deletions src/lcd_extractor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pandas as pd
from warnings import filterwarnings
from tqdm import tqdm
import requests
from typing import Optional, Union
import json
Expand All @@ -12,7 +11,6 @@


filterwarnings('ignore')
tqdm.pandas()


def get_denom_info(denom: str, node_lcd_url: str) -> [str, Optional[str]]:
Expand All @@ -28,7 +26,7 @@ def get_denom_info(denom: str, node_lcd_url: str) -> [str, Optional[str]]:
'denom_trace']
return str(_res_json['base_denom']), _res_json['path']
except Exception as _e:
logging.error(f'Error: {_e}')
logging.error(f'Not found denom {denom} node lcd url {node_lcd_url}. Error: {_e}')
return str(denom), 'Not found'
else:
return str(denom), None
Expand Down Expand Up @@ -116,7 +114,7 @@ def _get_counterparty_chain_id(channel: str, port_id: str, node_lcd_url: str) ->
f'channel {channel}, node_lcd_url {node_lcd_url}, port_id {port_id}')
return None
return {_channel: _get_counterparty_chain_id(channel=_channel, port_id=port_id, node_lcd_url=node_lcd_url)
for _channel in tqdm(channels)}
for _channel in channels}


def get_cw20_token_info(contract_address: str,
Expand All @@ -139,9 +137,9 @@ def get_cw20_token_info(contract_address: str,
if 'data' in _res.keys():
return _res['data']
if 'code' in _res.keys():
logging.error(f'{contract_address} {node_lcd_url} Not Implemented')
logging.error(f'contract address {contract_address} node lcd url {node_lcd_url}. Not Implemented')
return _res
logging.error(f'{contract_address} {node_lcd_url} Error {_res}')
logging.error(f'contract address {contract_address} node lcd url {node_lcd_url}. Error {_res}')
return {}


Expand All @@ -168,7 +166,7 @@ def get_assets(chain_id: str,
_assets_df = _assets_supply_df

_assets_df.loc[:, ['denom_base', 'path']] = \
_assets_df.progress_apply(
_assets_df.apply(
lambda row: pd.Series(
data=get_denom_info(
denom=row['denom'],
Expand Down Expand Up @@ -205,7 +203,7 @@ def get_assets(chain_id: str,
return _assets_df


def extract_assets(chain_id: str, node_lcd_url_list: list[str]) -> Optional[pd.DataFrame]:
def extract_assets(chain_id: str, node_lcd_url_list: list[str]) -> bool:
"""
Get dataframe with assets data for a given network by lcd list
:param chain_id: network chain id
Expand All @@ -215,11 +213,17 @@ def extract_assets(chain_id: str, node_lcd_url_list: list[str]) -> Optional[pd.D
_asset_df = None
for _node_lcd_url in node_lcd_url_list[::-1]:
try:
logging.info(f'node lcd url: {_node_lcd_url}')
logging.info(f'extract lcd for chain id: {chain_id} node lcd url: {_node_lcd_url}')
_asset_df = get_assets(chain_id=chain_id, node_lcd_url=_node_lcd_url)
break
except (ConnectionError, ReadTimeout, TimeoutError, json.JSONDecodeError):
logging.error(f'no connection for {chain_id} to lcd {_node_lcd_url}')
except (ConnectionError, ReadTimeout, TimeoutError, json.JSONDecodeError) as e:
logging.error(f'no connection for {chain_id} to lcd {_node_lcd_url}. Error: {e}')
except Exception as e:
logging.error(f'no connection for {chain_id} to lcd {_node_lcd_url}. Error: {e}')
return _asset_df

if _asset_df is None:
logging.info(f'data has not been loaded for {chain_id}, lcd apis not work')
return False
_asset_df.to_csv(f'data_csv/assets_{chain_id}.csv')
logging.info(msg=f'extracted {len(_asset_df):>,} assets for chain_id: `{chain_id}` node lcd url: {_node_lcd_url}')
return True

0 comments on commit e05c756

Please sign in to comment.