-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC Implementation using multiple relays to enable async communication #25
Conversation
…pportunities to lose a relay
…ting in lost relays (actually improves performance as well)
…more test coverage
…ll the sockets created
WalkthroughThe recent updates to the Goridge library enhance its functionality with interfaces for connection management, asynchronous RPC capabilities, and multi-relay support. Changes streamline RPC communication by refining method interfaces, improving connection handling, and extending testing coverage. These modifications aim to bolster reliability, efficiency, and ease of use in RPC communications within the Spiral framework, particularly for RoadRunner services. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
Hey @rustatian , @wolfy-j , @msmakouz , @roxblnfk , took a bit of maneuvering to get the commits across but this is the new PR with the latest changes in place. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 3
Configuration used: CodeRabbit UI
Files ignored due to path filters (1)
composer.json
is excluded by:!**/*.json
Files selected for processing (15)
- src/ConnectedRelayInterface.php (1 hunks)
- src/MultiRelayHelper.php (1 hunks)
- src/RPC/AbstractRPC.php (1 hunks)
- src/RPC/AsyncRPCInterface.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- src/RPC/RPC.php (4 hunks)
- src/RPC/RPCInterface.php (1 hunks)
- src/SocketRelay.php (4 hunks)
- src/StreamRelay.php (1 hunks)
- tests/Goridge/MsgPackMultiRPCTest.php (1 hunks)
- tests/Goridge/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRelayHelperTest.php (1 hunks)
- tests/Goridge/TCPMultiRPCTest.php (1 hunks)
- tests/Goridge/TCPRPCTest.php (1 hunks)
- tests/test-server/server.go (2 hunks)
Files skipped from review due to trivial changes (1)
- tests/Goridge/TCPRPCTest.php
Additional comments: 18
tests/Goridge/TCPMultiRPCTest.php (1)
- 10-14: The implementation of the
TCPMultiRPCTest
class with constants for socket address, port, and type is correctly done and follows best practices for defining such constants in a test class for TCP communication.src/RPC/RPCInterface.php (1)
- 29-29: The correction of the typo in the method description from "remove" to "remote" improves the clarity and accuracy of the documentation. Good attention to detail.
tests/Goridge/MultiRelayHelperTest.php (2)
- 9-26: The tests for
MultiRelayHelper
's ability to work withStreamRelay
instances are well-structured and correctly implemented, using PHPUnit's framework methods appropriately.- 18-25: The setup and execution of the test
testSupportsReadingFromStreamRelay
are correctly done, demonstrating proper use of streams for testing and appropriate assertions.src/ConnectedRelayInterface.php (1)
- 12-35: The
ConnectedRelayInterface
is well-designed, covering essential functionalities for a connected relay with clear documentation comments. The inclusion of methods for connection management and enforcing the__clone
method implementation is appropriate.tests/Goridge/MsgPackMultiRPCTest.php (1)
- 11-33: The tests for JSON exceptions in various scenarios and the override of the
makeRPC
method to useMsgpackCodec
are correctly implemented, demonstrating proper exception handling and test setup.src/RPC/RPC.php (1)
- 4-24: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [13-41]
The redesign of the
RPC
class to extendAbstractRPC
and streamline its functionality demonstrates an improvement in modularity and maintainability. Ensure that these changes are consistent with the overall architecture and properly handle any potential breaking changes.tests/test-server/server.go (1)
- 65-70: The addition of the
SleepEcho
function to simulate a delayed response is correctly implemented and serves its intended purpose for testing asynchronous capabilities. Good use oftime.Sleep
for delay simulation.src/RPC/AbstractRPC.php (1)
- 16-89: The
AbstractRPC
class provides a well-designed foundation for RPC communication, with flexible methods for setting service prefixes and codecs, and encapsulating essential functionalities. This design promotes modularity and code reuse.src/RPC/AsyncRPCInterface.php (1)
- 10-68: The
AsyncRPCInterface
is well-designed and clearly outlines the contract for asynchronous RPC communication, with methods covering the entire lifecycle of asynchronous calls. The interface follows best practices for interface definition.src/MultiRelayHelper.php (1)
- 10-89: The utility methods in
MultiRelayHelper
for finding relays with messages and checking connected relays are correctly implemented and provide essential functionalities for working with multiple relays. The use ofsocket_select
andstream_select
is appropriate and efficient.src/StreamRelay.php (1)
- 25-29: Changing the visibility of the
$in
property topublic
is justified by the need to enablestream_select
across multipleStreamRelays
. While making properties public can increase the risk of unintended modifications, the internal documentation explains the rationale behind this decision. It's important to understand the implications of such changes and the specific context that necessitated this decision.src/SocketRelay.php (2)
- 27-27: The class now implements
ConnectedRelayInterface
in addition toStringable
. This change aligns with the PR's objective to enhance async RPC communication by ensuring relay classes support connected operations.- 113-117: Adding a
__clone
method to nullify thesocket
reference on cloning is a good practice. It prevents unintended sharing of socket resources between cloned instances, which could lead to confusing bugs or resource leaks. This change enhances the robustness of the class when dealing with cloned objects.src/RPC/MultiRPC.php (4)
- 121-134: The
call
method implementation for synchronous RPC calls within an asynchronous context is correctly implemented. It ensures a free relay is available, sends the request, waits for the response, and decodes it. This method provides backward compatibility for synchronous calls, which is crucial for gradual migration to async operations. The method's error handling and sequence management are also well implemented.- 136-148: The
callIgnoreResponse
method correctly implements the functionality to send a request without waiting for a response. This method is essential for fire-and-forget scenarios where the response is not critical. However, it's important to ensure that the documentation clearly states the potential for lost responses and how the system handles such scenarios to prevent confusion or misuse.- 150-173: The
callAsync
method introduces the capability to make asynchronous calls and manage responses later. This method is a cornerstone of the async functionality, allowing for non-blocking operations. The implementation, including buffer management and sequence tracking, is well thought out. However, ensure thorough testing, especially around edge cases like buffer overflow and sequence management, to prevent potential issues in production environments.- 175-188: The
hasResponse
method provides a way to check if a response for a given sequence number is available, either in the buffer or directly from the relay. This method is crucial for managing async responses efficiently. The implementation appears correct, but ensure that the documentation clearly explains the sequence management and how to handle cases where a response might not be available due to various reasons (e.g., buffer overflow, sequence mismatch).
/** | ||
* @internal | ||
* This isn't really ideal but there's no easy way since we need access to the underlying socket | ||
* to do a socket_select across multiple SocketRelays. | ||
*/ | ||
public ?Socket $socket = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the visibility of the socket
property to public and adding documentation about its internal use case is a significant change. While this facilitates access for socket_select
across multiple SocketRelays
, exposing the Socket
object publicly could lead to unintended manipulations outside the class's control. Consider encapsulating the socket operations within the class or providing a controlled access method to mitigate potential risks.
tests/Goridge/MultiRPC.php
Outdated
public function testAllowsOnlySockets(): void | ||
{ | ||
$this->expectException(RPCException::class); | ||
$this->expectExceptionMessage( | ||
sprintf( | ||
"MultiRPC can only be used with relays implementing the %s, such as %s", | ||
ConnectedRelayInterface::class, | ||
SocketRelay::class | ||
) | ||
); | ||
new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]); | ||
} | ||
|
||
public function testNeedsAtLeastOne(): void | ||
{ | ||
$this->expectException(RPCException::class); | ||
$this->expectExceptionMessage("MultiRPC needs at least one relay. Zero provided."); | ||
new GoridgeMultiRPC([]); | ||
} | ||
|
||
public function testChecksIfResponseIsInRelay(): void | ||
{ | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
// Wait a bit | ||
usleep(100 * 1000); | ||
|
||
$this->assertTrue($this->rpc->hasResponse($id)); | ||
} | ||
|
||
public function testChecksIfResponseIsInBuffer(): void | ||
{ | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
// Wait a bit | ||
usleep(100 * 1000); | ||
$this->forceFlushRpc(); | ||
|
||
$this->assertTrue($this->rpc->hasResponse($id)); | ||
} | ||
|
||
public function testChecksIfResponseIsNotReceivedYet(): void | ||
{ | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$this->assertFalse($this->rpc->hasResponse($id)); | ||
} | ||
|
||
public function testChecksMultipleResponses(): void | ||
{ | ||
$ids = []; | ||
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$this->forceFlushRpc(); | ||
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
usleep(100 * 1000); | ||
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$responses = $this->rpc->hasResponses($ids); | ||
$this->assertContains($ids[0], $responses); | ||
$this->assertContains($ids[1], $responses); | ||
$this->assertNotContains($ids[2], $responses); | ||
} | ||
|
||
public function testHasResponsesReturnsEmptyArrayWhenNoResponses(): void | ||
{ | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$this->assertEmpty($this->rpc->hasResponses([$id])); | ||
} | ||
|
||
public function testGetResponsesReturnsWhenNoRelaysAvailableToAvoidInfiniteLoop(): void | ||
{ | ||
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); | ||
$property->setValue([]); | ||
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); | ||
$property->setValue([]); | ||
$this->expectedNumberOfRelays = 0; | ||
$this->expectException(RPCException::class); | ||
$this->expectExceptionMessage("No relays available at all"); | ||
$this->rpc->call('Service.Ping', 'ping'); | ||
} | ||
|
||
public function testMultiRPCIsUsableWithOneRelay(): void | ||
{ | ||
$this->makeRPC(1); | ||
$this->rpc->callIgnoreResponse('Service.Ping', 'ping'); | ||
$this->rpc->callIgnoreResponse('Service.SleepEcho', 'Hello'); | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$this->rpc->callIgnoreResponse('Service.Echo', 'Hello'); | ||
$this->assertSame('pong', $this->rpc->getResponse($id)); | ||
} | ||
|
||
protected function setUp(): void | ||
{ | ||
$this->makeRPC(); | ||
} | ||
|
||
protected function makeRPC(int $count = 10): void | ||
{ | ||
$type = self::SOCK_TYPE->value; | ||
$address = self::SOCK_ADDR; | ||
$port = self::SOCK_PORT; | ||
$this->rpc = GoridgeMultiRPC::create("$type://$address:$port", $count); | ||
$this->expectedNumberOfRelays = $count; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test suite for the new async RPC functionality is comprehensive and covers a wide range of scenarios. However, there are a few areas that could be improved for better maintainability and clarity:
-
Duplication in Test Setup: The
makeRPC
method is called in both thesetUp
method and directly in some test methods (e.g.,testMultiRPCIsUsableWithOneRelay
). Consider ensuring that the setup method adequately prepares the test environment for all tests to reduce direct calls tomakeRPC
within test methods, unless a specific test case requires a unique setup. -
Exception Handling in Async Tests: In tests where exceptions are expected (e.g.,
testInvalidServiceAsync
,testInvalidMethodAsync
), it's good practice to place only the code expected to throw the exception within theexpectException
block. This helps in precisely identifying the operation that triggers the exception. -
Reflection Usage: The tests make extensive use of reflection to access private properties and methods. While this is sometimes necessary for testing, consider if there are ways to test the desired outcomes without relying on reflection. This could involve adding appropriate public methods for testability or re-evaluating what needs to be tested directly.
-
Large Data Handling: Tests like
testLongEcho
andtestLongEchoAsync
use large payloads to test the system's handling of significant data sizes. Ensure that these tests do not disproportionately affect the overall test suite's execution time or resource usage. -
Error Message Clarity: In tests that check for specific exceptions and messages (e.g.,
testCannotGetSameResponseTwice
), ensure that the error messages are clear and informative. This helps in diagnosing issues when tests fail. -
Test Coverage for New Functionality: Ensure that the test suite comprehensively covers the new async functionality, including edge cases and failure modes. This might include more detailed testing around relay management, error handling in async contexts, and the behavior under high load or network issues.
-
Documentation and Comments: Adding comments to complex test cases or setup procedures can improve maintainability. This is especially useful for future contributors or when revisiting the tests after some time.
Overall, the test suite is a solid foundation for ensuring the reliability and performance of the new async RPC functionality. Addressing the above points can further enhance its effectiveness and maintainability.
Consider addressing the points mentioned for improvements in maintainability, clarity, and comprehensive coverage.
fix: Allowed usage of StreamRelay in MultiRPC since cloning isn't an issue Also added a few more tests and failure checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/MultiRelayHelper.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRelayHelperTest.php (1 hunks)
Files skipped from review as they are similar to previous changes (4)
- src/MultiRelayHelper.php
- src/RPC/MultiRPC.php
- tests/Goridge/MultiRPC.php
- tests/Goridge/MultiRelayHelperTest.php
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## 4.x #25 +/- ##
=============================================
+ Coverage 70.52% 81.53% +11.00%
- Complexity 120 198 +78
=============================================
Files 9 12 +3
Lines 302 509 +207
=============================================
+ Hits 213 415 +202
- Misses 89 94 +5 ☔ View full report in Codecov by Sentry. |
I've pushed the aforementioned fix, alongside a couple of additional tests and more checks in MultiRPC to make sure it's running correctly. I've also added a lengthy comment explaining the decision behind using static properties rather than instance variables. Tests should pass now. Let me know if anything is amiss :) |
@L3tum 👋 |
@L3tum I'm not sure, is that really needed to include a Go code in the PHP tests? Might be it would be better to use RR binary? |
@rustatian I'm not well versed in the out-of-the-box capabilities of RR for whether it'd be possible to replace all the tests. In the end it would probably make sense to at least add integration tests to all of the PHP SDK packages. But I'll defer that decision to the maintainers of it :) |
@L3tum Thanks for the work you've done! |
@msmakouz No problem, thanks for reviewing&merging it! |
Implements an "Async" RPC Interface and Implementation using multiple relays to effectively offer non-blocking IO in regards to the Roadrunner communication.
As discussed in the linked discussion, there's a (small but signifcant) delay in the RPC communication to Roadrunner that can have an impact on a service. In my case the service had a nominal response time of ~2ms, went up to 3ms with metric collection and also had a 2ms
kernel.terminate
listener to collect further metrics, so all in all was busy for ~5ms with only ~2ms spent on the actual request->response cycle.As a fix I propose an "async" implementation of the RPC interface. I don't really like calling it async, because it doesn't offer the same versatility as a proper async implementation, but it changes the control flow and thus qualifies for that name, I guess.
The interface has 3 significant new methods as well as a few supporting changes in other code:
hasResponse
andhasAnyResponse
just check if a response has been receivedResults
I've measured the impact in a testcase using the same repro used in the discussion. The results are the following:
As you can see the "good" implementation (this change) is within the normal deviation of the always-ignore implementation and offers an improvement ~10x performance. I've been using a (slightly modified) implementation of this in the aforementioned production service and could cut down the time spent collecting metrics from a total of 3ms to just 0,2ms, with a min/max of respectively 0.1ms and 0.3ms.
I've also added a change (PR following) in roadrunner-kv that uses the
callAsync
method to implement asetAsync
function. The results so far are promising. With a follow-up check on whether thekv.Set
call was successful, the async implementation is significantly faster:Full results for the Metrics calls:
Considerations
socket_select
andstream_select
. I've commented it as well as marked them with@internal
but it's definitely not ideal.The overflow response buffer currently has no upper limit and thus it's somewhat easy to get a memory leak here. I could add a check inImplemented a flush at 1000 entries (realistically the most a normal application should ever see is maybe ~50) -- I've upped this to 10_000 entries because in testing in particular Doctrine caches get hammered extremely hard and 1_000 is not enough.callAsync
to see if the response buffer is above a certain limit (1000?) and if so flush it. I'll probably do so when adding the tests.TODO
I have glanced at the tests in this repo and will try to implement a few for this change if you've given me the go-ahead with the current implementation.Added a number of testsSummary by CodeRabbit
ConnectedRelayInterface
for explicit connection management.MultiRelayHelper
for relay selection and connection status checks.AbstractRPC
,AsyncRPCInterface
, andMultiRPC
for synchronous and asynchronous calls.SocketRelay
andStreamRelay
changes for improved cloning and visibility adjustments.RPCInterface
method description.RPC
class to extendAbstractRPC
with changes in constructor and method adjustments.SocketRelay
andStreamRelay
.SleepEcho
function in test server for delayed response simulation.