Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Issue #709: do not enqueue all responses #713

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ckeshava
Copy link
Collaborator

@ckeshava ckeshava commented Jun 6, 2024

High Level Overview of Change

Do not enqueue messages that correspond to an existing request

Context of Change

Bug Fix: At the moment, we are enqueuing all the incoming messages.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Refactor (non-breaking change that only restructures code)
  • Tests (You added tests for code that already exists, or your new feature included in this PR)
  • Documentation Updates
  • Release

Did you update CHANGELOG.md?

  • Yes
  • No, this change does not impact library users
    This change impacts the memory usage of the system, hence is not directly perceivable by end users.

@ckeshava ckeshava requested a review from mvadari June 6, 2024 23:37
@ckeshava
Copy link
Collaborator Author

ckeshava commented Jun 6, 2024

I'm not able to find requests which increase the queue size

class TestWebSocketClient(TestCase):
    """Performace test of the async web_socket_client."""

    def test_web_socket_client(self: TestWebSocketClient) -> None:
        URL = "wss://s.devnet.rippletest.net:51233"

        with WebsocketClient(URL) as client:
            pendings_msg_size = []

            for _ in range(100):
                client.request(ServerInfo())
                pendings_msg_size.append(len(client._open_requests))

            print(max(pendings_msg_size)) # returns 0

@mvadari
Copy link
Collaborator

mvadari commented Jun 7, 2024

I'm not able to find requests which increase the queue size

class TestWebSocketClient(TestCase):
    """Performace test of the async web_socket_client."""

    def test_web_socket_client(self: TestWebSocketClient) -> None:
        URL = "wss://s.devnet.rippletest.net:51233"

        with WebsocketClient(URL) as client:
            pendings_msg_size = []

            for _ in range(100):
                client.request(ServerInfo())
                pendings_msg_size.append(len(client._open_requests))

            print(max(pendings_msg_size)) # returns 0

Try client._messages instead of client._open_requests.

# concurrent wallet_fund operations will attempt to advance the ledger at the same
# time. Consequently, all the funding operations fail, except for the first one.
# using a lock will serialize the access to this critical operation
async with async_wallet_fund_lock:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvadari I found that adding a lock to this section of the code eliminated the "Account Not Found" error.

@ckeshava
Copy link
Collaborator Author

ckeshava commented Jun 7, 2024

_messages

client._messages has a qsize of 0

witness_wallet = Wallet.create()
await fund_wallet_async(witness_wallet)
await asyncio.gather(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stick to one change per PR - this is unrelated to this PR.

@ckeshava ckeshava requested a review from mvadari June 11, 2024 22:23
@mvadari
Copy link
Collaborator

mvadari commented Jun 12, 2024

How did you test this change? If not by adding a test case, then what?

@ckeshava
Copy link
Collaborator Author

I don't have a good idea to test this change. But irrespective of the impact on this issue, I think this change is useful.

I don't know which inputs would reproduce the issue description.

@@ -0,0 +1,91 @@
"""Memory usage of Websocket clients."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be called test_websocket_client and the description should be more general, as it may be used for other tests in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 3a53ffc

class TestAsyncWebsocketClient(IsolatedAsyncioTestCase):
"""Memory usage of async-websocket client"""

async def test_msg_queue_async_websocket_client(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can still use the test_async_and_sync framework - there's a websockets_only parameter in the decoration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current version of the code uses test_async_and_sync construct, thanks for your suggestion.

but I don't like this version. Due to the use of await client.request(...), the requests are fulfilled in a serialized fashion. The queue size will never exceed one in this test.

Ideally, here is what I'd like to do: The asyncio.create_task(...) concurrently creates many requests. Now, we can verify that the responses are not logged into the queue.

class TestAsyncWebsocketClient(IntegrationTestCase):
    """Memory usage of async-websocket client"""

    @test_async_and_sync(globals(), websockets_only=True)
    async def test_msg_queue_async_websocket_client(self, client):
        """Test the rate of growth of the Message queue in async_websocket_client under
        persistent load. Admittedly, this is not a precise measure, rather its a proxy
        to measure the memory footprint of the client
        """

        for _ in range(5):
            asyncio.create_task(
                client.request(
                    BookOffers(
                        ledger_index="current",
                        taker_gets=XRP(),
                        taker_pays=IssuedCurrency(
                            currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                        ),
                        limit=500,
                    )
                )
            )

            asyncio.create_task(
                client.request(
                    BookOffers(
                        ledger_index="current",
                        taker_gets=IssuedCurrency(
                            currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                        ),
                        taker_pays=XRP(),
                        limit=500,
                    )
                )
            )

            # messages corresponding to the above two requests might reside in the
            # queue. If these requests have already been fulfilled, the queue size will be reduced by an appropriate amount
            self.assertEqual(client._messages.qsize() <= 2, True)

        # wait for some time, we want to ensure that all the requests are registered in
        # the websocket client. asyncio library asynchronously populates the queue
        await asyncio.sleep(2)
        # the messages queue has not increased in proportion to the requests/responses
        self.assertEqual(client._messages.qsize() <= 2, True)

I'm trying to accomodate asyncio.create_task inside test_async_and_sync decorator.

what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using subscriptions? Can those stay in the queue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not able to test the Subscribe method correctly. Contrary to my understanding, this code print only one message and gets stuck. It does not indefinitely print the stream of messages from the other peer.

What am I missing? The structure of the code is similar to the tests here

class TestSubscriptionWebsocketClient(IntegrationTestCase):
    @test_async_and_sync(globals(), websockets_only=True)
    async def test_msg_queue_size(self, client):
        req = Subscribe(streams=[StreamParameter.LEDGER])
        # NOTE: this code will run forever without a timeout, until the process is killed
        await client.send(req)

        # Note: AsyncWebSocket clients need the "async for" construct for execution.
        # Alternatively, we will need to implement __aiter__ method in the respective class.
        async for message in client:
            print(message)
            print("Queue size: ")
            print(client._messages.qsize())

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're connecting to a standalone node, you also need to close ledgers independently. A standalone node won't do that automatically for you. That's why there's the additional await accept_ledger_async() in the for loop in the test.

Alternatively, you can add use_testnet=True in the test_async_and_sync decorator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also be worth considering rewriting the use_testnet=True parameter into something that'll also allow you to connect to mainnet (i.e. not a boolean, so you can connect to other things as well).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks.

Subscribe does use the above message queue. Every response from the subscription stream adds one message, but that message is removed almost immediately. I don't know which method is popping out the messages, but the message queue is empty.

are you proposing that we use the Subscribe request for the message queue tests? but, it is not representative of a request-response back-and-forth

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fair. I was just thinking about commands that would legitimately grow the queue.


# enqueue the response for the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)
# a response that fulfills a future is not enqueued again
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is more confusing than useful IMO.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 3a53ffc

@ckeshava ckeshava requested a review from mvadari July 22, 2024 21:02
class TestWebsocketClient(IntegrationTestCase):
"""Memory usage of websocket client"""

@test_async_and_sync(globals(), websockets_only=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is against standalone, not mainnet. The data isn't going to really test anything.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing with a Standalone mode provides a useful baseline, because the performance will only get worse with Mainnet/Testnet.

I'm measuring the memory usage of the client when multiple successive requests are input to the client. (Since no new transactions are provided, there is no need to advance the ledger)

I can set use_testnet=True, I'm not opposed to that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current integration test framework does not easily let us test against the mainnet (unless we write additional utility methods). I'm assuming you're referring to the use_testnet parameter in test_async_and_sync decorator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration test framework can be changed, it's not set in stone.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you suggesting that I test the performance against the mainnet? Why would that be different than testing with the testnet or the standalone node?

The performance of the xrpl-py client is in question, does it matter if I use a standalone/testnet/mainnet server for this test?

Copy link
Collaborator

@mvadari mvadari Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't enough traffic on testnet/standalone to test the performance of xrpl-py, it's not going to run into any issues. There may not even be any data in this pair (or any pair) for the few seconds that you're testing on. You could theoretically generate that traffic yourself, but that'd require a much more complex test and I'm not sure it's worth that level of complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants