From b3ce986787dc4971cc892198d03e4a6db7d5a77a Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Thu, 6 Jun 2024 13:18:04 -0700 Subject: [PATCH 1/7] [FIX] Issue #709: do not enqueue all responses --- xrpl/asyncio/clients/websocket_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/xrpl/asyncio/clients/websocket_base.py b/xrpl/asyncio/clients/websocket_base.py index 6c592347a..3963bc5b0 100644 --- a/xrpl/asyncio/clients/websocket_base.py +++ b/xrpl/asyncio/clients/websocket_base.py @@ -135,9 +135,10 @@ async def _handler(self: WebsocketBase) -> None: # if this response corresponds to request, fulfill the Future if "id" in response_dict and response_dict["id"] in self._open_requests: self._open_requests[response_dict["id"]].set_result(response_dict) - - # enqueue the response for the message queue - cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) + else: + # a response that corresponds to a request is not enqueued again + # enqueue the response for the message queue + cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) def _set_up_future(self: WebsocketBase, request: Request) -> None: """ From 444f14b4979d159a85491cbfdb05cd5c84ccd5e7 Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Fri, 7 Jun 2024 15:02:27 -0700 Subject: [PATCH 2/7] use asyncio.gather to concurrently fund wallets for xchain tests --- tests/integration/it_utils.py | 12 ++++- tests/integration/reusable_values.py | 10 ++-- .../transactions/test_set_oracle.py | 54 +++++++++++-------- xrpl/asyncio/clients/websocket_base.py | 4 +- 4 files changed, 52 insertions(+), 28 deletions(-) diff --git a/tests/integration/it_utils.py b/tests/integration/it_utils.py index b7713c861..d2eb77f0c 100644 --- a/tests/integration/it_utils.py +++ b/tests/integration/it_utils.py @@ -111,6 +111,9 @@ def fund_wallet(wallet: Wallet) -> None: client.request(LEDGER_ACCEPT_REQUEST) +async_wallet_fund_lock = asyncio.Lock() + + async def fund_wallet_async( wallet: Wallet, client: AsyncClient = ASYNC_JSON_RPC_CLIENT ) -> None: @@ -119,8 +122,13 @@ async def fund_wallet_async( destination=wallet.address, amount=FUNDING_AMOUNT, ) - await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True) - await client.request(LEDGER_ACCEPT_REQUEST) + + # concurrent wallet_fund operations will attempt to advance the ledger at the same + # time. Consequently, all the funding operations fail, except for the first one. + # using a lock will serialize the access to this critical operation + async with async_wallet_fund_lock: + await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True) + await client.request(LEDGER_ACCEPT_REQUEST) # just submits a transaction to the ledger, synchronously diff --git a/tests/integration/reusable_values.py b/tests/integration/reusable_values.py index ae6c10b47..cb85c07dd 100644 --- a/tests/integration/reusable_values.py +++ b/tests/integration/reusable_values.py @@ -29,13 +29,15 @@ # faster) async def _set_up_reusable_values(): wallet = Wallet.create() - await fund_wallet_async(wallet) destination = Wallet.create() - await fund_wallet_async(destination) door_wallet = Wallet.create() - await fund_wallet_async(door_wallet) witness_wallet = Wallet.create() - await fund_wallet_async(witness_wallet) + await asyncio.gather( + fund_wallet_async(wallet), + fund_wallet_async(destination), + fund_wallet_async(door_wallet), + fund_wallet_async(witness_wallet), + ) offer = await sign_and_reliable_submission_async( OfferCreate( diff --git a/tests/integration/transactions/test_set_oracle.py b/tests/integration/transactions/test_set_oracle.py index fc948c16c..cb410c6c1 100644 --- a/tests/integration/transactions/test_set_oracle.py +++ b/tests/integration/transactions/test_set_oracle.py @@ -1,3 +1,4 @@ +import asyncio import time from tests.integration.integration_test_case import IntegrationTestCase @@ -14,32 +15,43 @@ _PROVIDER = str_to_hex("provider") _ASSET_CLASS = str_to_hex("currency") +lock = asyncio.Lock() + class TestSetOracle(IntegrationTestCase): @test_async_and_sync(globals()) async def test_all_fields(self, client): - tx = OracleSet( - account=WALLET.address, - # if oracle_document_id is not modified, the (sync, async) + - # (json, websocket) combination of integration tests will update the same - # oracle object using identical "LastUpdateTime". Updates to an oracle must - # be more recent than its previous LastUpdateTime - # a unique value is obtained for each combination of test run within the - # implementation of the test_async_and_sync decorator. - oracle_document_id=self.value, - provider=_PROVIDER, - asset_class=_ASSET_CLASS, - last_update_time=int(time.time()), - price_data_series=[ - PriceData( - base_asset="XRP", quote_asset="USD", asset_price=740, scale=1 - ), - PriceData( - base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2 - ), - ], + response = await sign_and_reliable_submission_async( + OracleSet( + account=WALLET.address, + # if oracle_document_id is not modified, the (sync, async) + + # (json, websocket) combination of integration tests will update the + # same oracle object using identical "LastUpdateTime". + # Updates to an oracle must be more recent than its previous + # LastUpdateTime a unique value is obtained for each combination + # of test run within the implementation of the + # test_async_and_sync decorator. + oracle_document_id=self.value, + provider=_PROVIDER, + asset_class=_ASSET_CLASS, + # contruct the OracleSet transaction in-place with submit-function, + # in order to obtain the most-recent timestamp. + # Otherwise, async execution of test cases might render this + # timestamp stale. + last_update_time=int(time.time()), + price_data_series=[ + PriceData( + base_asset="XRP", quote_asset="USD", asset_price=740, scale=1 + ), + PriceData( + base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2 + ), + ], + ), + WALLET, + client, ) - response = await sign_and_reliable_submission_async(tx, WALLET, client) + self.assertEqual(response.status, ResponseStatus.SUCCESS) self.assertEqual(response.result["engine_result"], "tesSUCCESS") diff --git a/xrpl/asyncio/clients/websocket_base.py b/xrpl/asyncio/clients/websocket_base.py index 3963bc5b0..a500f56f4 100644 --- a/xrpl/asyncio/clients/websocket_base.py +++ b/xrpl/asyncio/clients/websocket_base.py @@ -123,7 +123,9 @@ async def _handler(self: WebsocketBase) -> None: messages we check whether there is an outstanding future we need to resolve, and if so do so. - Then we store the already-parsed JSON in our own queue for generic iteration. + If the message corresponds to a pending request, it is stored appropriately, + otherwise we store the already-parsed JSON in our own queue for generic + iteration. As long as a given client remains open, this handler will be running as a Task. """ From 24df9378df0a796dcf34f03b9027cdd421e4036e Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Tue, 11 Jun 2024 14:59:38 -0700 Subject: [PATCH 3/7] Revert "use asyncio.gather to concurrently fund wallets for xchain tests" This reverts commit 444f14b4979d159a85491cbfdb05cd5c84ccd5e7. --- tests/integration/it_utils.py | 12 +---- tests/integration/reusable_values.py | 10 ++-- .../transactions/test_set_oracle.py | 54 ++++++++----------- xrpl/asyncio/clients/websocket_base.py | 4 +- 4 files changed, 28 insertions(+), 52 deletions(-) diff --git a/tests/integration/it_utils.py b/tests/integration/it_utils.py index d2eb77f0c..b7713c861 100644 --- a/tests/integration/it_utils.py +++ b/tests/integration/it_utils.py @@ -111,9 +111,6 @@ def fund_wallet(wallet: Wallet) -> None: client.request(LEDGER_ACCEPT_REQUEST) -async_wallet_fund_lock = asyncio.Lock() - - async def fund_wallet_async( wallet: Wallet, client: AsyncClient = ASYNC_JSON_RPC_CLIENT ) -> None: @@ -122,13 +119,8 @@ async def fund_wallet_async( destination=wallet.address, amount=FUNDING_AMOUNT, ) - - # concurrent wallet_fund operations will attempt to advance the ledger at the same - # time. Consequently, all the funding operations fail, except for the first one. - # using a lock will serialize the access to this critical operation - async with async_wallet_fund_lock: - await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True) - await client.request(LEDGER_ACCEPT_REQUEST) + await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True) + await client.request(LEDGER_ACCEPT_REQUEST) # just submits a transaction to the ledger, synchronously diff --git a/tests/integration/reusable_values.py b/tests/integration/reusable_values.py index cb85c07dd..ae6c10b47 100644 --- a/tests/integration/reusable_values.py +++ b/tests/integration/reusable_values.py @@ -29,15 +29,13 @@ # faster) async def _set_up_reusable_values(): wallet = Wallet.create() + await fund_wallet_async(wallet) destination = Wallet.create() + await fund_wallet_async(destination) door_wallet = Wallet.create() + await fund_wallet_async(door_wallet) witness_wallet = Wallet.create() - await asyncio.gather( - fund_wallet_async(wallet), - fund_wallet_async(destination), - fund_wallet_async(door_wallet), - fund_wallet_async(witness_wallet), - ) + await fund_wallet_async(witness_wallet) offer = await sign_and_reliable_submission_async( OfferCreate( diff --git a/tests/integration/transactions/test_set_oracle.py b/tests/integration/transactions/test_set_oracle.py index cb410c6c1..fc948c16c 100644 --- a/tests/integration/transactions/test_set_oracle.py +++ b/tests/integration/transactions/test_set_oracle.py @@ -1,4 +1,3 @@ -import asyncio import time from tests.integration.integration_test_case import IntegrationTestCase @@ -15,43 +14,32 @@ _PROVIDER = str_to_hex("provider") _ASSET_CLASS = str_to_hex("currency") -lock = asyncio.Lock() - class TestSetOracle(IntegrationTestCase): @test_async_and_sync(globals()) async def test_all_fields(self, client): - response = await sign_and_reliable_submission_async( - OracleSet( - account=WALLET.address, - # if oracle_document_id is not modified, the (sync, async) + - # (json, websocket) combination of integration tests will update the - # same oracle object using identical "LastUpdateTime". - # Updates to an oracle must be more recent than its previous - # LastUpdateTime a unique value is obtained for each combination - # of test run within the implementation of the - # test_async_and_sync decorator. - oracle_document_id=self.value, - provider=_PROVIDER, - asset_class=_ASSET_CLASS, - # contruct the OracleSet transaction in-place with submit-function, - # in order to obtain the most-recent timestamp. - # Otherwise, async execution of test cases might render this - # timestamp stale. - last_update_time=int(time.time()), - price_data_series=[ - PriceData( - base_asset="XRP", quote_asset="USD", asset_price=740, scale=1 - ), - PriceData( - base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2 - ), - ], - ), - WALLET, - client, + tx = OracleSet( + account=WALLET.address, + # if oracle_document_id is not modified, the (sync, async) + + # (json, websocket) combination of integration tests will update the same + # oracle object using identical "LastUpdateTime". Updates to an oracle must + # be more recent than its previous LastUpdateTime + # a unique value is obtained for each combination of test run within the + # implementation of the test_async_and_sync decorator. + oracle_document_id=self.value, + provider=_PROVIDER, + asset_class=_ASSET_CLASS, + last_update_time=int(time.time()), + price_data_series=[ + PriceData( + base_asset="XRP", quote_asset="USD", asset_price=740, scale=1 + ), + PriceData( + base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2 + ), + ], ) - + response = await sign_and_reliable_submission_async(tx, WALLET, client) self.assertEqual(response.status, ResponseStatus.SUCCESS) self.assertEqual(response.result["engine_result"], "tesSUCCESS") diff --git a/xrpl/asyncio/clients/websocket_base.py b/xrpl/asyncio/clients/websocket_base.py index a500f56f4..3963bc5b0 100644 --- a/xrpl/asyncio/clients/websocket_base.py +++ b/xrpl/asyncio/clients/websocket_base.py @@ -123,9 +123,7 @@ async def _handler(self: WebsocketBase) -> None: messages we check whether there is an outstanding future we need to resolve, and if so do so. - If the message corresponds to a pending request, it is stored appropriately, - otherwise we store the already-parsed JSON in our own queue for generic - iteration. + Then we store the already-parsed JSON in our own queue for generic iteration. As long as a given client remains open, this handler will be running as a Task. """ From c4b35c1c7ebd3055e860a35ffd29c9de593e4bd9 Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Fri, 14 Jun 2024 16:05:58 -0700 Subject: [PATCH 4/7] integration tests to measure the memory growth of Websocket clients -- Many thanks to https://github.com/Jbekker for pointing out the issue and the testcase for the same --- .../clients/test_async_websocket_client.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 tests/integration/clients/test_async_websocket_client.py diff --git a/tests/integration/clients/test_async_websocket_client.py b/tests/integration/clients/test_async_websocket_client.py new file mode 100644 index 000000000..a5acc10d7 --- /dev/null +++ b/tests/integration/clients/test_async_websocket_client.py @@ -0,0 +1,91 @@ +"""Memory usage of Websocket clients.""" + +from __future__ import annotations + +import asyncio +from unittest import TestCase + +from xrpl.asyncio.clients import AsyncWebsocketClient +from xrpl.clients.websocket_client import WebsocketClient +from xrpl.models.currencies import XRP, IssuedCurrency +from xrpl.models.requests import BookOffers + +try: + from unittest import IsolatedAsyncioTestCase +except ImportError: + from aiounittest import AsyncTestCase as IsolatedAsyncioTestCase # type: ignore + + +class TestAsyncWebsocketClient(IsolatedAsyncioTestCase): + """Memory usage of async-websocket client""" + + async def test_msg_queue_async_websocket_client( + self: TestAsyncWebsocketClient, + ) -> None: + """Test the rate of growth of the Message queue in async_websocket_client under + persistent load. Admittedly, this is not a precise measure, rather its a proxy + to measure the memory footprint of the client + """ + async with AsyncWebsocketClient("wss://s1.ripple.com") as client: + for _ in range(5): + await client.request( + BookOffers( + ledger_index="current", + taker_gets=XRP(), + taker_pays=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + limit=500, + ) + ) + + await client.request( + BookOffers( + ledger_index="current", + taker_gets=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + taker_pays=XRP(), + limit=500, + ) + ) + + self.assertEqual(client._messages.qsize(), 0) + await asyncio.sleep(2) + + +class TestSyncWebsocketClient(TestCase): + """Memory usage of sync-websocket client""" + + def test_msg_queue_sync_websocket_client( + self: TestSyncWebsocketClient, + ) -> None: + """Test the rate of growth of the Message queue in sync_websocket_client under + persistent load. Admittedly, this is not a precise measure, rather its a proxy + to measure the memory footprint of the client + """ + with WebsocketClient("wss://s1.ripple.com") as client: + for _ in range(5): + client.request( + BookOffers( + ledger_index="current", + taker_gets=XRP(), + taker_pays=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + limit=500, + ) + ) + + client.request( + BookOffers( + ledger_index="current", + taker_gets=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + taker_pays=XRP(), + limit=500, + ) + ) + + self.assertEqual(client._messages.qsize(), 0) From f3597bcf4a0963d5755b43e9676568bf444a615e Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Fri, 14 Jun 2024 16:09:30 -0700 Subject: [PATCH 5/7] improve wording of the comments --- xrpl/asyncio/clients/websocket_base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/xrpl/asyncio/clients/websocket_base.py b/xrpl/asyncio/clients/websocket_base.py index 3963bc5b0..fee70c2b3 100644 --- a/xrpl/asyncio/clients/websocket_base.py +++ b/xrpl/asyncio/clients/websocket_base.py @@ -1,4 +1,5 @@ """A client for interacting with the rippled WebSocket API.""" + from __future__ import annotations import asyncio @@ -135,9 +136,9 @@ async def _handler(self: WebsocketBase) -> None: # if this response corresponds to request, fulfill the Future if "id" in response_dict and response_dict["id"] in self._open_requests: self._open_requests[response_dict["id"]].set_result(response_dict) + # a response that fulfills a future is not enqueued again else: - # a response that corresponds to a request is not enqueued again - # enqueue the response for the message queue + # otherwise, enqueue the response into the message queue cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) def _set_up_future(self: WebsocketBase, request: Request) -> None: From 88b0428a0083712ac344fc769ce6ccd5a73cea0c Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Wed, 26 Jun 2024 14:12:05 -0700 Subject: [PATCH 6/7] address PR comment: remove comment --- xrpl/asyncio/clients/websocket_base.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/xrpl/asyncio/clients/websocket_base.py b/xrpl/asyncio/clients/websocket_base.py index fee70c2b3..f0c24be3a 100644 --- a/xrpl/asyncio/clients/websocket_base.py +++ b/xrpl/asyncio/clients/websocket_base.py @@ -136,9 +136,7 @@ async def _handler(self: WebsocketBase) -> None: # if this response corresponds to request, fulfill the Future if "id" in response_dict and response_dict["id"] in self._open_requests: self._open_requests[response_dict["id"]].set_result(response_dict) - # a response that fulfills a future is not enqueued again else: - # otherwise, enqueue the response into the message queue cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) def _set_up_future(self: WebsocketBase, request: Request) -> None: From 3a53ffc66ab7b99675d8a5fbb59aa52ae73d6d4f Mon Sep 17 00:00:00 2001 From: Chenna Keshava B S Date: Thu, 27 Jun 2024 11:51:57 -0700 Subject: [PATCH 7/7] use async_and_sync construct for running the integ tests --- .../clients/test_async_websocket_client.py | 91 ------------------- .../clients/test_websocket_client.py | 52 +++++++++++ 2 files changed, 52 insertions(+), 91 deletions(-) delete mode 100644 tests/integration/clients/test_async_websocket_client.py create mode 100644 tests/integration/clients/test_websocket_client.py diff --git a/tests/integration/clients/test_async_websocket_client.py b/tests/integration/clients/test_async_websocket_client.py deleted file mode 100644 index a5acc10d7..000000000 --- a/tests/integration/clients/test_async_websocket_client.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Memory usage of Websocket clients.""" - -from __future__ import annotations - -import asyncio -from unittest import TestCase - -from xrpl.asyncio.clients import AsyncWebsocketClient -from xrpl.clients.websocket_client import WebsocketClient -from xrpl.models.currencies import XRP, IssuedCurrency -from xrpl.models.requests import BookOffers - -try: - from unittest import IsolatedAsyncioTestCase -except ImportError: - from aiounittest import AsyncTestCase as IsolatedAsyncioTestCase # type: ignore - - -class TestAsyncWebsocketClient(IsolatedAsyncioTestCase): - """Memory usage of async-websocket client""" - - async def test_msg_queue_async_websocket_client( - self: TestAsyncWebsocketClient, - ) -> None: - """Test the rate of growth of the Message queue in async_websocket_client under - persistent load. Admittedly, this is not a precise measure, rather its a proxy - to measure the memory footprint of the client - """ - async with AsyncWebsocketClient("wss://s1.ripple.com") as client: - for _ in range(5): - await client.request( - BookOffers( - ledger_index="current", - taker_gets=XRP(), - taker_pays=IssuedCurrency( - currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" - ), - limit=500, - ) - ) - - await client.request( - BookOffers( - ledger_index="current", - taker_gets=IssuedCurrency( - currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" - ), - taker_pays=XRP(), - limit=500, - ) - ) - - self.assertEqual(client._messages.qsize(), 0) - await asyncio.sleep(2) - - -class TestSyncWebsocketClient(TestCase): - """Memory usage of sync-websocket client""" - - def test_msg_queue_sync_websocket_client( - self: TestSyncWebsocketClient, - ) -> None: - """Test the rate of growth of the Message queue in sync_websocket_client under - persistent load. Admittedly, this is not a precise measure, rather its a proxy - to measure the memory footprint of the client - """ - with WebsocketClient("wss://s1.ripple.com") as client: - for _ in range(5): - client.request( - BookOffers( - ledger_index="current", - taker_gets=XRP(), - taker_pays=IssuedCurrency( - currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" - ), - limit=500, - ) - ) - - client.request( - BookOffers( - ledger_index="current", - taker_gets=IssuedCurrency( - currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" - ), - taker_pays=XRP(), - limit=500, - ) - ) - - self.assertEqual(client._messages.qsize(), 0) diff --git a/tests/integration/clients/test_websocket_client.py b/tests/integration/clients/test_websocket_client.py new file mode 100644 index 000000000..c2fa86def --- /dev/null +++ b/tests/integration/clients/test_websocket_client.py @@ -0,0 +1,52 @@ +"""Performance testing of Websocket clients.""" + +from __future__ import annotations + +from tests.integration.integration_test_case import IntegrationTestCase +from tests.integration.it_utils import test_async_and_sync +from xrpl.models.currencies import XRP, IssuedCurrency +from xrpl.models.requests import BookOffers +from xrpl.models.response import ResponseStatus + + +class TestWebsocketClient(IntegrationTestCase): + """Memory usage of websocket client""" + + @test_async_and_sync(globals(), websockets_only=True) + async def test_msg_queue_growth_websocket_client(self, client): + """Test the rate of growth of the Message queue in websocket_client under + persistent load. Admittedly, this is not a precise measure, rather its a proxy + to measure the memory footprint of the client + """ + + for _ in range(5): + response = await client.request( + BookOffers( + ledger_index="current", + taker_gets=XRP(), + taker_pays=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + limit=500, + ) + ) + + self.assertEqual(response.status, ResponseStatus.SUCCESS) + + response = await client.request( + BookOffers( + ledger_index="current", + taker_gets=IssuedCurrency( + currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" + ), + taker_pays=XRP(), + limit=500, + ) + ) + self.assertEqual(response.status, ResponseStatus.SUCCESS) + + self.assertEqual(client._messages.qsize(), 0) + + # the messages queue has not increased in proportion to the requests/responses + # input load + self.assertEqual(client._messages.qsize(), 0)