From 6e32ecac9cd0393e5a1a80dfb31c98e34dfa7a21 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 13 Nov 2024 16:41:31 -0500 Subject: [PATCH 1/3] # TODO: stop dropping tasks on the floor --- chia/_tests/core/data_layer/test_data_rpc.py | 6 +++++- chia/_tests/core/full_node/test_full_node.py | 2 ++ chia/daemon/client.py | 2 ++ chia/farmer/farmer.py | 2 ++ chia/full_node/full_node.py | 6 ++++++ chia/full_node/full_node_api.py | 2 ++ chia/rpc/rpc_server.py | 1 + chia/seeder/dns_server.py | 2 ++ chia/server/server.py | 1 + chia/server/ws_connection.py | 8 ++++++++ chia/wallet/wallet_node.py | 4 ++++ 11 files changed, 35 insertions(+), 1 deletion(-) diff --git a/chia/_tests/core/data_layer/test_data_rpc.py b/chia/_tests/core/data_layer/test_data_rpc.py index 356d20b6dff1..b168a7c48dfd 100644 --- a/chia/_tests/core/data_layer/test_data_rpc.py +++ b/chia/_tests/core/data_layer/test_data_rpc.py @@ -73,7 +73,11 @@ from chia.wallet.wallet import Wallet from chia.wallet.wallet_node import WalletNode -pytestmark = pytest.mark.data_layer +pytestmark = [ + pytest.mark.data_layer, + pytest.mark.limit_consensus_modes(reason="save time"), +] + nodes = tuple[WalletNode, FullNodeSimulator] nodes_with_port_bt_ph = tuple[WalletRpcApi, FullNodeSimulator, uint16, bytes32, BlockTools] wallet_and_port_tuple = tuple[WalletNode, uint16] diff --git a/chia/_tests/core/full_node/test_full_node.py b/chia/_tests/core/full_node/test_full_node.py index 0c14a0abfaaf..f13490bbe9ee 100644 --- a/chia/_tests/core/full_node/test_full_node.py +++ b/chia/_tests/core/full_node/test_full_node.py @@ -812,6 +812,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-2].reward_chain_block.get_unfinished().get_hash(), ) + # TODO: stop dropping tasks on the floor asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0)) @@ -823,6 +824,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-1].reward_chain_block.get_unfinished().get_hash(), ) + # TODO: stop dropping tasks on the floor asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1)) diff --git a/chia/daemon/client.py b/chia/daemon/client.py index 3a4866ab2f95..c9b6e3400c70 100644 --- a/chia/daemon/client.py +++ b/chia/daemon/client.py @@ -57,6 +57,7 @@ async def listener_task() -> None: finally: await self.close() + # TODO: stop dropping tasks on the floor asyncio.create_task(listener_task()) await asyncio.sleep(1) @@ -81,6 +82,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage: string = dict_to_json_str(request) if self.websocket is None or self.websocket.closed: raise Exception("Websocket is not connected") + # TODO: stop dropping tasks on the floor asyncio.create_task(self.websocket.send_str(string)) try: await asyncio.wait_for(self._request_dict[request_id].wait(), timeout=30) diff --git a/chia/farmer/farmer.py b/chia/farmer/farmer.py index 98cdbae588c2..0335441ce585 100644 --- a/chia/farmer/farmer.py +++ b/chia/farmer/farmer.py @@ -198,8 +198,10 @@ async def start_task() -> None: if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self._root_path, "farmer", self.log)) + # TODO: stop dropping tasks on the floor asyncio.create_task(start_task()) try: yield diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 71cd11d3fac4..933d77548a08 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -280,6 +280,7 @@ async def manage(self) -> AsyncIterator[None]: self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof()) if self.config.get("enable_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self.root_path, "node", self.log)) self.profile_block_validation = self.config.get("profile_block_validation", False) @@ -290,6 +291,7 @@ async def manage(self) -> AsyncIterator[None]: profile_dir.mkdir(parents=True, exist_ok=True) if self.config.get("enable_memory_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(mem_profile_task(self.root_path, "node", self.log)) time_taken = time.monotonic() - start_time @@ -337,6 +339,7 @@ async def manage(self) -> AsyncIterator[None]: self.initialized = True if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.start()) try: yield @@ -353,6 +356,7 @@ async def manage(self) -> AsyncIterator[None]: self.mempool_manager.shut_down() if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.close()) if self.uncompact_task is not None: self.uncompact_task.cancel() @@ -491,6 +495,7 @@ async def _handle_transactions(self) -> None: # However, doing them one at a time would be slow, because they get sent to other processes. await self.add_transaction_semaphore.acquire() item: TransactionQueueEntry = await self.transaction_queue.pop() + # TODO: stop dropping tasks on the floor asyncio.create_task(self._handle_one_transaction(item)) async def initialize_weight_proof(self) -> None: @@ -874,6 +879,7 @@ async def on_connect(self, connection: WSChiaConnection) -> None: self._state_changed("add_connection") self._state_changed("sync_mode") if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.on_connect(connection)) if self.initialized is False: diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index 7b50c4551869..395038219526 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -459,6 +459,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, None) + # TODO: stop dropping tasks on the floor asyncio.create_task(eventually_clear()) return msg @@ -527,6 +528,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, foliage_hash) + # TODO: stop dropping tasks on the floor asyncio.create_task(eventually_clear()) return msg diff --git a/chia/rpc/rpc_server.py b/chia/rpc/rpc_server.py index f8a4d73548dc..f7068df546d4 100644 --- a/chia/rpc/rpc_server.py +++ b/chia/rpc/rpc_server.py @@ -226,6 +226,7 @@ async def _state_changed(self, change: str, change_data: Optional[dict[str, Any] def state_changed(self, change: str, change_data: Optional[dict[str, Any]] = None) -> None: if self.websocket is None or self.websocket.closed: return None + # TODO: stop dropping tasks on the floor asyncio.create_task(self._state_changed(change, change_data)) @property diff --git a/chia/seeder/dns_server.py b/chia/seeder/dns_server.py index 2f37e6966ba6..d178a33fd0ca 100644 --- a/chia/seeder/dns_server.py +++ b/chia/seeder/dns_server.py @@ -80,6 +80,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: dns_request: Optional[DNSRecord] = parse_dns_request(data) if dns_request is None: # Invalid Request, we can just drop it and move on. return + # TODO: stop dropping tasks on the floor asyncio.create_task(self.handler(dns_request, addr)) async def respond(self) -> None: @@ -192,6 +193,7 @@ def eof_received(self) -> Optional[bool]: f"Received incomplete TCP DNS request of length {self.expected_length} from {self.peer_info}, " f"closing connection after dns replies are sent." ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.wait_for_futures()) return True # Keep connection open, until the futures are done. log.info(f"Received early EOF from {self.peer_info}, closing connection.") diff --git a/chia/server/server.py b/chia/server/server.py index 8e3dec10fa56..9583d11a5455 100644 --- a/chia/server/server.py +++ b/chia/server/server.py @@ -500,6 +500,7 @@ async def start_client( connection_type_str = connection.connection_type.name.lower() self.log.info(f"Connected with {connection_type_str} {target_node}") if is_feeler: + # TODO: stop dropping tasks on the floor asyncio.create_task(connection.close()) return True except client_exceptions.ClientConnectorError as e: diff --git a/chia/server/ws_connection.py b/chia/server/ws_connection.py index d72ebd2a945c..20d7a3d75775 100644 --- a/chia/server/ws_connection.py +++ b/chia/server/ws_connection.py @@ -639,6 +639,7 @@ async def _send_message(self, message: Message) -> None: # TODO: fix this special case. This function has rate limits which are too low. if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers: + # TODO: stop dropping tasks on the floor asyncio.create_task(self._wait_and_retry(message)) return None @@ -667,6 +668,7 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) await asyncio.sleep(3) elif message.type == WSMsgType.CLOSE: @@ -675,10 +677,12 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) await asyncio.sleep(3) elif message.type == WSMsgType.CLOSED: if not self.closed: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) await asyncio.sleep(3) return None @@ -700,6 +704,7 @@ async def _read_one_message(self) -> Optional[Message]: f"message: {message_type}" ) # Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close(300)) await asyncio.sleep(3) return None @@ -713,13 +718,16 @@ async def _read_one_message(self) -> Optional[Message]: elif message.type == WSMsgType.ERROR: self.log.error(f"WebSocket Error: {message}") if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close(300)) else: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) await asyncio.sleep(3) else: self.log.error(f"Unexpected WebSocket message type: {message}") + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) await asyncio.sleep(3) return None diff --git a/chia/wallet/wallet_node.py b/chia/wallet/wallet_node.py index 2c0a5b54ede6..0ba7ac012176 100644 --- a/chia/wallet/wallet_node.py +++ b/chia/wallet/wallet_node.py @@ -426,9 +426,11 @@ async def _start_with_fingerprint( if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self.root_path, "wallet", self.log)) if self.config.get("enable_memory_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log)) path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint)) @@ -517,6 +519,7 @@ def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None: def _pending_tx_handler(self) -> None: if self._wallet_state_manager is None: return None + # TODO: stop dropping tasks on the floor asyncio.create_task(self._resend_queue()) async def _resend_queue(self) -> None: @@ -718,6 +721,7 @@ def initialize_wallet_peers(self) -> None: default_port, self.log, ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.wallet_peers.start()) async def on_disconnect(self, peer: WSChiaConnection) -> None: From 23f31e80ea63d1c6dbf9d5e0f4e72be24294214e Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 14 Nov 2024 07:39:12 -0500 Subject: [PATCH 2/3] Update chia/_tests/core/data_layer/test_data_rpc.py --- chia/_tests/core/data_layer/test_data_rpc.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/chia/_tests/core/data_layer/test_data_rpc.py b/chia/_tests/core/data_layer/test_data_rpc.py index b168a7c48dfd..356d20b6dff1 100644 --- a/chia/_tests/core/data_layer/test_data_rpc.py +++ b/chia/_tests/core/data_layer/test_data_rpc.py @@ -73,11 +73,7 @@ from chia.wallet.wallet import Wallet from chia.wallet.wallet_node import WalletNode -pytestmark = [ - pytest.mark.data_layer, - pytest.mark.limit_consensus_modes(reason="save time"), -] - +pytestmark = pytest.mark.data_layer nodes = tuple[WalletNode, FullNodeSimulator] nodes_with_port_bt_ph = tuple[WalletRpcApi, FullNodeSimulator, uint16, bytes32, BlockTools] wallet_and_port_tuple = tuple[WalletNode, uint16] From f235106d06001690c9acaac80c92c41da3b36031 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 14 Nov 2024 18:49:29 -0500 Subject: [PATCH 3/3] no really, stop --- chia/daemon/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chia/daemon/server.py b/chia/daemon/server.py index d537fd7e9436..e7e9709aaed7 100644 --- a/chia/daemon/server.py +++ b/chia/daemon/server.py @@ -1192,6 +1192,7 @@ async def start_plotting(self, websocket: WebSocketResponse, request: dict[str, log.info(f"Plotting will start in {config['delay']} seconds") # TODO: loop gets passed down a lot, review for potential removal loop = asyncio.get_running_loop() + # TODO: stop dropping tasks on the floor loop.create_task(self._start_plotting(id, loop, queue)) # noqa: RUF006 else: log.info("Plotting will start automatically when previous plotting finish")