Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Issue #709: do not enqueue all responses #713

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
91 changes: 91 additions & 0 deletions tests/integration/clients/test_async_websocket_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Memory usage of Websocket clients."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be called test_websocket_client and the description should be more general, as it may be used for other tests in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 3a53ffc


from __future__ import annotations

import asyncio
from unittest import TestCase

from xrpl.asyncio.clients import AsyncWebsocketClient
from xrpl.clients.websocket_client import WebsocketClient
from xrpl.models.currencies import XRP, IssuedCurrency
from xrpl.models.requests import BookOffers

try:
from unittest import IsolatedAsyncioTestCase
except ImportError:
from aiounittest import AsyncTestCase as IsolatedAsyncioTestCase # type: ignore


class TestAsyncWebsocketClient(IsolatedAsyncioTestCase):
"""Memory usage of async-websocket client"""

async def test_msg_queue_async_websocket_client(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can still use the test_async_and_sync framework - there's a websockets_only parameter in the decoration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current version of the code uses test_async_and_sync construct, thanks for your suggestion.

but I don't like this version. Due to the use of await client.request(...), the requests are fulfilled in a serialized fashion. The queue size will never exceed one in this test.

Ideally, here is what I'd like to do: The asyncio.create_task(...) concurrently creates many requests. Now, we can verify that the responses are not logged into the queue.

class TestAsyncWebsocketClient(IntegrationTestCase):
    """Memory usage of async-websocket client"""

    @test_async_and_sync(globals(), websockets_only=True)
    async def test_msg_queue_async_websocket_client(self, client):
        """Test the rate of growth of the Message queue in async_websocket_client under
        persistent load. Admittedly, this is not a precise measure, rather its a proxy
        to measure the memory footprint of the client
        """

        for _ in range(5):
            asyncio.create_task(
                client.request(
                    BookOffers(
                        ledger_index="current",
                        taker_gets=XRP(),
                        taker_pays=IssuedCurrency(
                            currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                        ),
                        limit=500,
                    )
                )
            )

            asyncio.create_task(
                client.request(
                    BookOffers(
                        ledger_index="current",
                        taker_gets=IssuedCurrency(
                            currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                        ),
                        taker_pays=XRP(),
                        limit=500,
                    )
                )
            )

            # messages corresponding to the above two requests might reside in the
            # queue. If these requests have already been fulfilled, the queue size will be reduced by an appropriate amount
            self.assertEqual(client._messages.qsize() <= 2, True)

        # wait for some time, we want to ensure that all the requests are registered in
        # the websocket client. asyncio library asynchronously populates the queue
        await asyncio.sleep(2)
        # the messages queue has not increased in proportion to the requests/responses
        self.assertEqual(client._messages.qsize() <= 2, True)

I'm trying to accomodate asyncio.create_task inside test_async_and_sync decorator.

what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using subscriptions? Can those stay in the queue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not able to test the Subscribe method correctly. Contrary to my understanding, this code print only one message and gets stuck. It does not indefinitely print the stream of messages from the other peer.

What am I missing? The structure of the code is similar to the tests here

class TestSubscriptionWebsocketClient(IntegrationTestCase):
    @test_async_and_sync(globals(), websockets_only=True)
    async def test_msg_queue_size(self, client):
        req = Subscribe(streams=[StreamParameter.LEDGER])
        # NOTE: this code will run forever without a timeout, until the process is killed
        await client.send(req)

        # Note: AsyncWebSocket clients need the "async for" construct for execution.
        # Alternatively, we will need to implement __aiter__ method in the respective class.
        async for message in client:
            print(message)
            print("Queue size: ")
            print(client._messages.qsize())

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're connecting to a standalone node, you also need to close ledgers independently. A standalone node won't do that automatically for you. That's why there's the additional await accept_ledger_async() in the for loop in the test.

Alternatively, you can add use_testnet=True in the test_async_and_sync decorator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also be worth considering rewriting the use_testnet=True parameter into something that'll also allow you to connect to mainnet (i.e. not a boolean, so you can connect to other things as well).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks.

Subscribe does use the above message queue. Every response from the subscription stream adds one message, but that message is removed almost immediately. I don't know which method is popping out the messages, but the message queue is empty.

are you proposing that we use the Subscribe request for the message queue tests? but, it is not representative of a request-response back-and-forth

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fair. I was just thinking about commands that would legitimately grow the queue.

self: TestAsyncWebsocketClient,
) -> None:
"""Test the rate of growth of the Message queue in async_websocket_client under
persistent load. Admittedly, this is not a precise measure, rather its a proxy
to measure the memory footprint of the client
"""
async with AsyncWebsocketClient("wss://s1.ripple.com") as client:
for _ in range(5):
await client.request(
BookOffers(
ledger_index="current",
taker_gets=XRP(),
taker_pays=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
limit=500,
)
)

await client.request(
BookOffers(
ledger_index="current",
taker_gets=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
taker_pays=XRP(),
limit=500,
)
)

self.assertEqual(client._messages.qsize(), 0)
await asyncio.sleep(2)


class TestSyncWebsocketClient(TestCase):
"""Memory usage of sync-websocket client"""

def test_msg_queue_sync_websocket_client(
self: TestSyncWebsocketClient,
) -> None:
"""Test the rate of growth of the Message queue in sync_websocket_client under
persistent load. Admittedly, this is not a precise measure, rather its a proxy
to measure the memory footprint of the client
"""
with WebsocketClient("wss://s1.ripple.com") as client:
for _ in range(5):
client.request(
BookOffers(
ledger_index="current",
taker_gets=XRP(),
taker_pays=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
limit=500,
)
)

client.request(
BookOffers(
ledger_index="current",
taker_gets=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
taker_pays=XRP(),
limit=500,
)
)

self.assertEqual(client._messages.qsize(), 0)
8 changes: 5 additions & 3 deletions xrpl/asyncio/clients/websocket_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A client for interacting with the rippled WebSocket API."""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -135,9 +136,10 @@ async def _handler(self: WebsocketBase) -> None:
# if this response corresponds to request, fulfill the Future
if "id" in response_dict and response_dict["id"] in self._open_requests:
self._open_requests[response_dict["id"]].set_result(response_dict)

# enqueue the response for the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)
# a response that fulfills a future is not enqueued again
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is more confusing than useful IMO.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 3a53ffc

else:
# otherwise, enqueue the response into the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)

def _set_up_future(self: WebsocketBase, request: Request) -> None:
"""
Expand Down
Loading