Unofficial work of PHP gRPC implementation for asynchronous calls. The implementation is taken from @arnaud-lb php-async branch.
Here lies the documentation to interact with my own grpc binding library.
PHP does not have an event-loop. Thus, user-space event loop is required to check if a call is completed. The following
must be called periodically as a scheduled task. The first parameter in this method is the time allowed for the polling
mechanism to time-out, setting it to PHP_INT_MIN
will cause it to immediately process any events and return immediately
no more than O(1).
Call::drainCompletionQueue(PHP_INT_MIN);
The completion queue will check if any calls is complete once.
Currently, we support all types of user call. Thanks to the new asynchronous implementation, Bidirectional streaming is now possible over PHP without blocking the current executing thread (if the RPC is using asynchronous methods). The table below shows the supported service method via asynchronous or synchronous operations.
Service Method | Asynchronous | Synchronous |
---|---|---|
Unary RPC | Supported | Supported |
Server Streaming RPC | Supported | Supported |
Client Streaming RPC | Supported | Supported |
Bidirectional RPC | Supported | Supported |
All service method in PHP now requires a Callback to handle
messages received from the server. The callback method may return an object if documented, usually the object returned are
messages that was consumed from the server and/or a RpcCallStatus
that contains information about the RPC after completed.
Each AbstractCall
will now have an interface indicating that the call itself is either ClientCallInterface
and/or a
ServerCallInterface
, the implementing methods have similar implementation but a clear and concise documentation can be
referred in the interface itself.
Interface | Unary | Server Streaming | Client Streaming | Bidirectional |
---|---|---|---|---|
ServerCallInterface | Yes | Yes | No | Yes |
ClientCallInterface | No | No | Yes | Yes |
Unary call is the basic foundation of the HTTP protocol, a client send out a request, and they will expect an answer returned from the server. The code below is how the preferred way on how to start a Unary Call.
import Grpc\Status\RpcCallStatus;
$request = $service->startUnaryCall(new Parameter()); // Method generated by grpc
$request->onStreamNext(function (Message $message, RpcCallStatus $status): void {
// Perform operations on $message, will be called once.
})
For asynchronous operation, it is not required to start another onStreamNext
call. The logic is such that
the previous callback will be reused to consume the next message received.
import Grpc\Status\RpcCallStatus;
$request = $service->startServerStreamingCall(new Parameter()); // Method generated by grpc
$request->onStreamCompleted(function (RpcCallStatus $status): void {
// Indicates that this stream is complete.
});
$request->onStreamNext(function (Message $message): void {
// Perform operations on $message, will be called multiple times.
});
However, for synchronous operation. it is required to start another onStreamNext
call after receiving a message
from the server. In other word, the previous callback will not be reused to consume the next message. The behaviour for
this method is different from an asynchronous operations because of how synchronous operations work.
Since the working thread will be blocked, having onStreamNext
to handle multiple messages from the server will prevent
other tasks from running. PHP is mostly consists of a single-thread that executes tasks, the option to wait for message
to be consumed from the server is very limited.
Therefore, developers will have to properly implement on how messages will be consumed to all ServerCallInterface
based on their requirements.
import Grpc\Status\RpcCallStatus;
$request = $service->startServerStreamingCall(new Parameter()); // Method generated by grpc
$request->onStreamCompleted(function (RpcCallStatus $status): void {
// Indicates that this stream is complete.
});
$consumer = function (Message $message): void {
// Perform operations on $message, will be called multiple times.
};
// This is an example, the behavior depends on your requirements.
while ($request->isServerReady()) {
$request->onStreamNext($consumer);
// ... Do other tasks.
}
A client streaming call is a service method that send messages to the server and the server will return a single response for us to consume/read. For both asynchronous and synchronous operations, below snippet can be used.
import Grpc\Status\RpcCallStatus;
$data = []; // Data that contains N values.
$request = $service->startClientStreamingCall(); // Method generated by grpc
// It is important to note that you MUST wait for the client to finish sending the initial metadata to the server,
// which is why we wait for a callback from them, then after that we can finally start sending the messages to
// the server. ALTHOUGH, for synchronous operation, this note does not apply because the initial metadata was
// already sent during the start of the RPC.
$request->onClientReady(function () use ($request, $data): void {
while (!empty($data)) {
$message = array_pop($data);
$request->onClientNext($message);
}
// After sending all the necessary messages, be sure to call this method so that the server knows that
// you are not sending any messages thus returning the result of processing the given messages from the server.
$request->onClientCompleted(function (Message $message, RpcCallStatus $status) {
// Handle $message consumed from the server.
})
});
A bidirectional streaming call is a combination of Client streaming method and Server streaming method, it typically used for long-lived streaming and/or processes that takes multiple input and output from client and the server. Either way, it depends on the use-cases and how you want it to be implemented.
import Grpc\Status\RpcCallStatus;
$request = $this->gsmsService->startBidiStreamingCall();
// Wait for the initial metadata to be sent, for synchronous operations, this does not apply and can directly
// call the onStreamNext method.
$request->onClientReady(function () use ($request): void {
// For synchronous requests, you are required to call this method multiple times.
$request->onStreamNext(function (Message $response): void {
// Start listening to updates.
});
});
$request->onStreamCompleted(function (RpcCallStatus $status): void {
// Handle completion RPC Call status.
});
// ... Somewhere in other code.
// Check if RPC is ready to send messages.
if ($request->isClientReady()) {
$request->onClientNext(new Message()); // Send message.
}