Skip to content

Commit

Permalink
Merge pull request hummingbot#6760 from aarmoa/feat/injective_chain_s…
Browse files Browse the repository at this point in the history
…treams

Feat/injective chain streams
  • Loading branch information
cardosofede authored Jan 12, 2024
2 parents 0ceab3d + 593e813 commit 1809b40
Show file tree
Hide file tree
Showing 33 changed files with 3,884 additions and 3,949 deletions.
2 changes: 2 additions & 0 deletions bin/hummingbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
load_client_config_map_from_file,
write_config_to_yml,
)
from hummingbot.client.config.security import Security
from hummingbot.client.hummingbot_application import HummingbotApplication
from hummingbot.client.settings import AllConnectorSettings
from hummingbot.client.ui import login_prompt
Expand Down Expand Up @@ -53,6 +54,7 @@ async def ui_start_handler(self):


async def main_async(client_config_map: ClientConfigAdapter):
await Security.wait_til_decryption_done()
await create_yml_files_legacy()

# This init_logging() call is important, to skip over the missing config warnings.
Expand Down
10 changes: 10 additions & 0 deletions hummingbot/client/config/security.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from pathlib import Path
from typing import Dict, Optional

Expand All @@ -16,6 +17,7 @@
)
from hummingbot.core.utils.async_call_scheduler import AsyncCallScheduler
from hummingbot.core.utils.async_utils import safe_ensure_future
from hummingbot.logger import HummingbotLogger


class Security:
Expand All @@ -24,6 +26,14 @@ class Security:
_secure_configs = {}
_decryption_done = asyncio.Event()

_logger: Optional[HummingbotLogger] = None

@classmethod
def logger(cls) -> HummingbotLogger:
if cls._logger is None:
cls._logger = logging.getLogger(__name__)
return cls._logger

@staticmethod
def new_password_required() -> bool:
return not PASSWORD_VERIFICATION_PATH.exists()
Expand Down
5 changes: 3 additions & 2 deletions hummingbot/connector/client_order_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ def _trigger_failure_event(self, order: InFlightOrder):
)

def _trigger_order_creation(self, tracked_order: InFlightOrder, previous_state: OrderState, new_state: OrderState):
if previous_state == OrderState.PENDING_CREATE and new_state not in [OrderState.CANCELED, OrderState.FAILED,
OrderState.PENDING_CANCEL]:
if (previous_state == OrderState.PENDING_CREATE and
previous_state != new_state and
new_state not in [OrderState.CANCELED, OrderState.FAILED, OrderState.PENDING_CANCEL]):
self.logger().info(tracked_order.build_order_created_message())
self._trigger_created_event(tracked_order)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
DEFAULT_DOMAIN = ""
TESTNET_DOMAIN = "testnet"

MAX_ORDER_ID_LEN = CONSTANTS.MAX_ORDER_ID_LEN
HBOT_ORDER_ID_PREFIX = CONSTANTS.HBOT_ORDER_ID_PREFIX

TRANSACTIONS_CHECK_INTERVAL = CONSTANTS.TRANSACTIONS_CHECK_INTERVAL

ORDER_STATE_MAP = CONSTANTS.ORDER_STATE_MAP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def domain(self) -> str:

@property
def client_order_id_max_length(self) -> int:
return None
return CONSTANTS.MAX_ORDER_ID_LEN

@property
def client_order_id_prefix(self) -> str:
return ""
return CONSTANTS.HBOT_ORDER_ID_PREFIX

@property
def trading_rules_request_path(self) -> str:
Expand Down Expand Up @@ -704,37 +704,14 @@ async def _user_stream_event_listener(self):
await self._check_created_orders_status_for_transaction(transaction_hash=transaction_hash)
elif channel == "trade":
trade_update = event_data
tracked_order = self._order_tracker.all_fillable_orders_by_exchange_order_id.get(
trade_update.exchange_order_id
)
if tracked_order is not None:
new_trade_update = TradeUpdate(
trade_id=trade_update.trade_id,
client_order_id=tracked_order.client_order_id,
exchange_order_id=trade_update.exchange_order_id,
trading_pair=trade_update.trading_pair,
fill_timestamp=trade_update.fill_timestamp,
fill_price=trade_update.fill_price,
fill_base_amount=trade_update.fill_base_amount,
fill_quote_amount=trade_update.fill_quote_amount,
fee=trade_update.fee,
is_taker=trade_update.is_taker,
)
self._order_tracker.process_trade_update(new_trade_update)
self._order_tracker.process_trade_update(trade_update)
elif channel == "order":
order_update = event_data
tracked_order = self._order_tracker.all_updatable_orders_by_exchange_order_id.get(
order_update.exchange_order_id)
tracked_order = self._order_tracker.all_updatable_orders.get(order_update.client_order_id)
if tracked_order is not None:
new_order_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=order_update.new_state,
client_order_id=tracked_order.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)
self._order_tracker.process_order_update(order_update=new_order_update)
is_partial_fill = order_update.new_state == OrderState.FILLED and not tracked_order.is_filled
if not is_partial_fill:
self._order_tracker.process_order_update(order_update=order_update)
elif channel == "balance":
if event_data.total_balance is not None:
self._account_balances[event_data.asset_name] = event_data.total_balance
Expand Down Expand Up @@ -806,41 +783,21 @@ async def _all_trade_updates_for_order(self, order: GatewayPerpetualInFlightOrde
async def _update_orders_fills(self, orders: List[GatewayPerpetualInFlightOrder]):
oldest_order_creation_time = self.current_timestamp
all_market_ids = set()
orders_by_hash = {}

for order in orders:
oldest_order_creation_time = min(oldest_order_creation_time, order.creation_timestamp)
all_market_ids.add(await self.exchange_symbol_associated_to_pair(trading_pair=order.trading_pair))
if order.exchange_order_id is not None:
orders_by_hash[order.exchange_order_id] = order

try:
start_time = min(oldest_order_creation_time, self._latest_polled_order_fill_time)
trade_updates = await self._data_source.perpetual_trade_updates(market_ids=all_market_ids, start_time=start_time)
trade_updates = await self._data_source.perpetual_trade_updates(
market_ids=all_market_ids, start_time=start_time
)
for trade_update in trade_updates:
tracked_order = orders_by_hash.get(trade_update.exchange_order_id)
if tracked_order is not None:
fee = TradeFeeBase.new_perpetual_fee(
fee_schema=self.trade_fee_schema(),
position_action=tracked_order.position,
percent_token=trade_update.fee.percent_token,
flat_fees=trade_update.fee.flat_fees,
)
new_trade_update = TradeUpdate(
trade_id=trade_update.trade_id,
client_order_id=tracked_order.client_order_id,
exchange_order_id=trade_update.exchange_order_id,
trading_pair=trade_update.trading_pair,
fill_timestamp=trade_update.fill_timestamp,
fill_price=trade_update.fill_price,
fill_base_amount=trade_update.fill_base_amount,
fill_quote_amount=trade_update.fill_quote_amount,
fee=fee,
is_taker=trade_update.is_taker,
)
self._latest_polled_order_fill_time = max(self._latest_polled_order_fill_time,
trade_update.fill_timestamp)
self._order_tracker.process_trade_update(new_trade_update)
self._latest_polled_order_fill_time = max(
self._latest_polled_order_fill_time, trade_update.fill_timestamp
)
self._order_tracker.process_trade_update(trade_update)
except asyncio.CancelledError:
raise
except Exception as ex:
Expand All @@ -856,13 +813,12 @@ async def _request_order_status(self, tracked_order: GatewayPerpetualInFlightOrd
async def _update_orders_with_error_handler(self, orders: List[GatewayPerpetualInFlightOrder], error_handler: Callable):
oldest_order_creation_time = self.current_timestamp
all_market_ids = set()
orders_by_hash = {}
orders_by_id = {}

for order in orders:
oldest_order_creation_time = min(oldest_order_creation_time, order.creation_timestamp)
all_market_ids.add(await self.exchange_symbol_associated_to_pair(trading_pair=order.trading_pair))
if order.exchange_order_id is not None:
orders_by_hash[order.exchange_order_id] = order
orders_by_id[order.client_order_id] = order

try:
order_updates = await self._data_source.perpetual_order_updates(
Expand All @@ -871,48 +827,37 @@ async def _update_orders_with_error_handler(self, orders: List[GatewayPerpetualI
)

for order_update in order_updates:
tracked_order = orders_by_hash.get(order_update.exchange_order_id)
tracked_order = orders_by_id.get(order_update.client_order_id)
if tracked_order is not None:
try:
new_order_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=order_update.new_state,
client_order_id=tracked_order.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)

if tracked_order.current_state == OrderState.PENDING_CREATE and new_order_update.new_state != OrderState.OPEN:
if tracked_order.current_state == OrderState.PENDING_CREATE and order_update.new_state != OrderState.OPEN:
open_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=OrderState.OPEN,
client_order_id=tracked_order.client_order_id,
client_order_id=order_update.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)
self._order_tracker.process_order_update(open_update)

del orders_by_hash[order_update.exchange_order_id]
self._order_tracker.process_order_update(new_order_update)
del orders_by_id[order_update.client_order_id]
self._order_tracker.process_order_update(order_update)
except asyncio.CancelledError:
raise
except Exception as ex:
await error_handler(tracked_order, ex)

if len(orders_by_hash) > 0:
# await self._data_source.check_order_hashes_synchronization(orders=orders_by_hash.values())
for order in orders_by_hash.values():
not_found_error = RuntimeError(
f"There was a problem updating order {order.client_order_id} "
f"({CONSTANTS.ORDER_NOT_FOUND_ERROR_MESSAGE})"
)
await error_handler(order, not_found_error)
for order in orders_by_id.values():
not_found_error = RuntimeError(
f"There was a problem updating order {order.client_order_id} "
f"({CONSTANTS.ORDER_NOT_FOUND_ERROR_MESSAGE})"
)
await error_handler(order, not_found_error)
except asyncio.CancelledError:
raise
except Exception as request_error:
for order in orders_by_hash.values():
for order in orders_by_id.values():
await error_handler(order, request_error)

def _create_web_assistants_factory(self) -> WebAssistantsFactory:
Expand Down Expand Up @@ -1029,46 +974,22 @@ async def _check_orders_transactions(self):
async def _check_orders_creation_transactions(self):
orders: List[GatewayPerpetualInFlightOrder] = self._order_tracker.active_orders.values()
orders_by_creation_tx = defaultdict(list)
orders_with_inconsistent_hash = []

for order in orders:
if order.creation_transaction_hash is not None and order.is_pending_create:
orders_by_creation_tx[order.creation_transaction_hash].append(order)

for transaction_hash, orders in orders_by_creation_tx.items():
all_orders = orders.copy()
try:
order_updates = await self._data_source.order_updates_for_transaction(
transaction_hash=transaction_hash, perpetual_orders=orders
)

for order_update in order_updates:
tracked_order = self._order_tracker.active_orders.get(order_update.client_order_id)
if tracked_order is not None:
all_orders.remove(tracked_order)
if (tracked_order.exchange_order_id is not None
and tracked_order.exchange_order_id != order_update.exchange_order_id):
tracked_order.update_exchange_order_id(order_update.exchange_order_id)
orders_with_inconsistent_hash.append(tracked_order)
self._order_tracker.process_order_update(order_update=order_update)

for not_found_order in all_orders:
self._update_order_after_failure(
order_id=not_found_order.client_order_id,
trading_pair=not_found_order.trading_pair
)

except ValueError:
self.logger().debug(f"Transaction not included in a block yet ({transaction_hash})")

if len(orders_with_inconsistent_hash) > 0:
async with self._data_source.order_creation_lock:
active_orders = [
order for order in self._order_tracker.active_orders.values()
if order not in orders_with_inconsistent_hash and order.current_state == OrderState.PENDING_CREATE
]
await self._data_source.reset_order_hash_generator(active_orders=active_orders)

async def _check_created_orders_status_for_transaction(self, transaction_hash: str):
transaction_orders = []
order: GatewayPerpetualInFlightOrder
Expand Down
Loading

0 comments on commit 1809b40

Please sign in to comment.