Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

# TODO: stop dropping tasks on the floor #18866

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion chia/_tests/core/data_layer/test_data_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_node import WalletNode

pytestmark = pytest.mark.data_layer
pytestmark = [
pytest.mark.data_layer,
pytest.mark.limit_consensus_modes(reason="save time"),
]

altendky marked this conversation as resolved.
Show resolved Hide resolved
nodes = tuple[WalletNode, FullNodeSimulator]
nodes_with_port_bt_ph = tuple[WalletRpcApi, FullNodeSimulator, uint16, bytes32, BlockTools]
wallet_and_port_tuple = tuple[WalletNode, uint16]
Expand Down
2 changes: 2 additions & 0 deletions chia/_tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ async def suppress_value_error(coro: Coroutine) -> None:
uint32(0),
blocks_reorg[-2].reward_chain_block.get_unfinished().get_hash(),
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer)))
await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0))

Expand All @@ -823,6 +824,7 @@ async def suppress_value_error(coro: Coroutine) -> None:
uint32(0),
blocks_reorg[-1].reward_chain_block.get_unfinished().get_hash(),
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer)))
await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1))

Expand Down
2 changes: 2 additions & 0 deletions chia/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def listener_task() -> None:
finally:
await self.close()

# TODO: stop dropping tasks on the floor
asyncio.create_task(listener_task())
await asyncio.sleep(1)

Expand All @@ -91,6 +92,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage:
string = dict_to_json_str(request)
if self.websocket is None or self.websocket.closed:
raise Exception("Websocket is not connected")
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.websocket.send_str(string))
try:
await asyncio.wait_for(self._request_dict[request_id].wait(), timeout=30)
Expand Down
2 changes: 2 additions & 0 deletions chia/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ async def start_task() -> None:
if sys.getprofile() is not None:
self.log.warning("not enabling profiler, getprofile() is already set")
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self._root_path, "farmer", self.log))

# TODO: stop dropping tasks on the floor
asyncio.create_task(start_task())
try:
yield
Expand Down
6 changes: 6 additions & 0 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ async def manage(self) -> AsyncIterator[None]:
self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

if self.config.get("enable_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self.root_path, "node", self.log))

self.profile_block_validation = self.config.get("profile_block_validation", False)
Expand All @@ -290,6 +291,7 @@ async def manage(self) -> AsyncIterator[None]:
profile_dir.mkdir(parents=True, exist_ok=True)

if self.config.get("enable_memory_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(mem_profile_task(self.root_path, "node", self.log))

time_taken = time.monotonic() - start_time
Expand Down Expand Up @@ -337,6 +339,7 @@ async def manage(self) -> AsyncIterator[None]:

self.initialized = True
if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.start())
try:
yield
Expand All @@ -353,6 +356,7 @@ async def manage(self) -> AsyncIterator[None]:
self.mempool_manager.shut_down()

if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.close())
if self.uncompact_task is not None:
self.uncompact_task.cancel()
Expand Down Expand Up @@ -491,6 +495,7 @@ async def _handle_transactions(self) -> None:
# However, doing them one at a time would be slow, because they get sent to other processes.
await self.add_transaction_semaphore.acquire()
item: TransactionQueueEntry = await self.transaction_queue.pop()
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._handle_one_transaction(item))

async def initialize_weight_proof(self) -> None:
Expand Down Expand Up @@ -874,6 +879,7 @@ async def on_connect(self, connection: WSChiaConnection) -> None:
self._state_changed("add_connection")
self._state_changed("sync_mode")
if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.on_connect(connection))

if self.initialized is False:
Expand Down
2 changes: 2 additions & 0 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ async def eventually_clear() -> None:
await asyncio.sleep(5)
self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, None)

# TODO: stop dropping tasks on the floor
asyncio.create_task(eventually_clear())

return msg
Expand Down Expand Up @@ -527,6 +528,7 @@ async def eventually_clear() -> None:
await asyncio.sleep(5)
self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, foliage_hash)

# TODO: stop dropping tasks on the floor
asyncio.create_task(eventually_clear())

return msg
Expand Down
1 change: 1 addition & 0 deletions chia/rpc/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ async def _state_changed(self, change: str, change_data: Optional[dict[str, Any]
def state_changed(self, change: str, change_data: Optional[dict[str, Any]] = None) -> None:
if self.websocket is None or self.websocket.closed:
return None
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._state_changed(change, change_data))

@property
Expand Down
2 changes: 2 additions & 0 deletions chia/seeder/dns_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
dns_request: Optional[DNSRecord] = parse_dns_request(data)
if dns_request is None: # Invalid Request, we can just drop it and move on.
return
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.handler(dns_request, addr))

async def respond(self) -> None:
Expand Down Expand Up @@ -192,6 +193,7 @@ def eof_received(self) -> Optional[bool]:
f"Received incomplete TCP DNS request of length {self.expected_length} from {self.peer_info}, "
f"closing connection after dns replies are sent."
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.wait_for_futures())
return True # Keep connection open, until the futures are done.
log.info(f"Received early EOF from {self.peer_info}, closing connection.")
Expand Down
1 change: 1 addition & 0 deletions chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ async def start_client(
self.log.info(f"Connected with {connection_type_str} {target_node}")
else:
self.log.debug(f"Successful feeler connection with {connection_type_str} {target_node}")
# TODO: stop dropping tasks on the floor
asyncio.create_task(connection.close())
return True
except client_exceptions.ClientConnectorError as e:
Expand Down
8 changes: 8 additions & 0 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ async def _send_message(self, message: Message) -> None:

# TODO: fix this special case. This function has rate limits which are too low.
if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._wait_and_retry(message))

return None
Expand Down Expand Up @@ -667,6 +668,7 @@ async def _read_one_message(self) -> Optional[Message]:
f"{self.peer_server_port}/"
f"{self.peer_info.port}"
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close())
await asyncio.sleep(3)
elif message.type == WSMsgType.CLOSE:
Expand All @@ -675,10 +677,12 @@ async def _read_one_message(self) -> Optional[Message]:
f"{self.peer_server_port}/"
f"{self.peer_info.port}"
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close())
await asyncio.sleep(3)
elif message.type == WSMsgType.CLOSED:
if not self.closed:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close())
await asyncio.sleep(3)
return None
Expand All @@ -700,6 +704,7 @@ async def _read_one_message(self) -> Optional[Message]:
f"message: {message_type}"
)
# Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300))
await asyncio.sleep(3)
return None
Expand All @@ -713,13 +718,16 @@ async def _read_one_message(self) -> Optional[Message]:
elif message.type == WSMsgType.ERROR:
self.log.error(f"WebSocket Error: {message}")
if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300))
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close())
await asyncio.sleep(3)

else:
self.log.error(f"Unexpected WebSocket message type: {message}")
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close())
await asyncio.sleep(3)
return None
Expand Down
4 changes: 4 additions & 0 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,11 @@ async def _start_with_fingerprint(
if sys.getprofile() is not None:
self.log.warning("not enabling profiler, getprofile() is already set")
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self.root_path, "wallet", self.log))

if self.config.get("enable_memory_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log))

path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint))
Expand Down Expand Up @@ -517,6 +519,7 @@ def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None:
def _pending_tx_handler(self) -> None:
if self._wallet_state_manager is None:
return None
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._resend_queue())

async def _resend_queue(self) -> None:
Expand Down Expand Up @@ -718,6 +721,7 @@ def initialize_wallet_peers(self) -> None:
default_port,
self.log,
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.wallet_peers.start())

async def on_disconnect(self, peer: WSChiaConnection) -> None:
Expand Down
Loading