-
Notifications
You must be signed in to change notification settings - Fork 3.6k
PIP 74: Pulsar client memory limits
- Status: Proposal
- Author: Matteo Merli
- Pull Request:
- Mailing List discussion:
- Release:
Currently, there are multiple settings in producers and consumers that allow to control the sizes of the message queues. These are ultimately used to control the amount of memory used by the Pulsar client.
There are few issues with the current approach, that makes it complicated, in non-trivial scenarios, to select an overall configuration that leads to a certain use of memory:
1. The settings are in terms of "number of messages", so one has to adjust based on the expected message size.
2. The settings are per producer/consumer. If an application has a large (or simply unknown) number number of producers/consumer, it's very difficult to select an appropriate value for queue sizes. The same is true for topics that have many partitions.
Allow to specify a maximum amount of memory on a given Pulsar client. The producers and consumers will compete for the memory assigned.
The scope of this proposal is to track the "direct" memory used to hold outgoing or incoming message payloads. The usage of JVM heap memory is outside the scope, though normally it only represents a small fraction of the payloads size.
The change will be done in multiple steps: 1. Introduce new API to set the memory limit, keeping it disabled by default 1.1 Implement memory limit for producers 2. Implement memory limit for consumers 3. (Once the feature is stable) Enable memory limit by default, with 64 MB 4. Deprecate producer queue size related settings 5. Ignore producer queue size related settings
public class ClientBuilder {
// ....
/**
* Configure a limit on the amount of direct memory that will be allocated by this client instance.
* <p>
* Setting this to 0 will disable the limit.
*
* @param memoryLimit
* the limit
* @param unit
* the memory limit size unit
* @return the client builder instance
*/
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
}
For example:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.memoryLimit(64, SizeUnit.MEGA_BYTES)
.create();
The same logic described here will be implemented first in Java and then in the rest of supported client libraries.
The enforcement of the memory limit is done in a different way for producers as
compared to consumers, although both will share a common instance of a
MemoryLimitController
, which is unique per each PulsarClient
instance.
interface MemoryLimitController {
// Non-blocking
boolean tryReserveMemory(long size);
// Blocks until memory is available
void reserveMemory(long size) throws InterruptedException;
void releaseMemory(long size);
}
The implementation of MemoryLimitController
will be optimized to avoid
contention across multiple threads trying to reserve and release memory.
If the memory limit is set, the producer will first try to acquire the semaphore that is currently set based on the producer queue size, then it will try to reserve the memory, either in blocking or non-blocking way, depending on the producer configuration.
The logic for consumers is slightly more complicated because a client needs to give permits to brokers to push messages. Right now, the count is done in terms of messages and it cannot simply switch to "bytes" because a consumer will need to consume from multiple topics and partitions.
Because of that, we need to invert the order, by using the memory limit controller as a wait to interact with the flow control, reserving memory after we have already received the messages.
The proposal is to keep the receiver queue size as a configuration, although treat it more as the "max receiver queue size".
When a consumer is created, and memory limit is set, it will use these
parameters:
* maxReceiverQueueSize
: the values configured by the application
* currentReceiverQueue
: a dynamic limit that will be auto adjusted, starting
from an initial value (eg: 1) up to the max.
The goal is to step up the currentReceiverQueue
if and only if:
1. Doing it would improve throughput
2. We have enough memory to do it
The rationale for (1)
is that increasing currentReceiverQueue
increases the
amount of pre-fetched messages for the consumer. The goal of the prefetch queue
is to make sure an application calling receive()
will never be blocked waiting,
if there are messages available.
Therefore, we can determine that the currentReceiverQueue
is limiting the
throughput if:
1. Application calls receive and there are no pre-fetched messages 2. And we know that there are messages pending to be sent for this consumer. For this, in case of exclusive/failover subscriptions, we can rely on the last message id, while for other subscription types, we need to introduce using a new special command to brokers that will tell if broker waiting on consumer for more permits.
If these conditions are true, we can increase the currentReceiverQueue
to
increase the load.
With this mechanism, we can start with a very small window and expand as needed, maintaining the minimum size that is able to sustain the requested throughput.
In order to limit the consumer memory, we would only increase the
currentReceiverQueue
if the memory usage is below a certain threshold (eg: 75%).
Also, if the usage reaches a higher threshold, (eg: 95%) we will reduce the
currentReceiverQueue
for all the consumers.