Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][cli] Add command line option for configuring the memory limit #20663

Merged
merged 23 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ebbe45a
Putback memory limit
JooHyukKim Jun 27, 2023
e8b262d
Fix check style
JooHyukKim Jun 27, 2023
c82e923
Implement perfUtil and test
JooHyukKim Jun 27, 2023
4062954
Add JavaDoc to memory limit
JooHyukKim Jun 28, 2023
2c2275e
Minimize PerformanceProducer change
JooHyukKim Jun 28, 2023
daae661
Implement perf testclients with memory unit
JooHyukKim Jun 28, 2023
979eaec
Fix check style
JooHyukKim Jun 28, 2023
aa7fe74
Minimize changes
JooHyukKim Jun 28, 2023
f3891af
Add license header
JooHyukKim Jun 28, 2023
2579fed
Merge remote-tracking branch 'upstream/master' into 15912-Add-command…
JooHyukKim Jun 29, 2023
5c1eec3
Refactor to have default value inline
JooHyukKim Jun 30, 2023
7618b99
Merge remote-tracking branch 'upstream/master' into 15912-Add-command…
JooHyukKim Jul 16, 2023
756134f
Merge remote-tracking branch 'upstream/master' into 15912-Add-command…
JooHyukKim Jul 22, 2023
99172b3
Merge branch 'master' into 15912-Add-command-line-option-for-configur…
JooHyukKim Aug 23, 2023
b24f4e0
Import CLI utils
JooHyukKim Aug 23, 2023
d95a36f
Replace temporary converter
JooHyukKim Aug 23, 2023
9f18bc7
Minimize changes by using existing module
JooHyukKim Aug 23, 2023
9525de1
.
JooHyukKim Aug 23, 2023
c1de7c2
Merge remote-tracking branch 'upstream/master' into 15912-Add-command…
JooHyukKim Sep 4, 2023
7c0e29e
Update PerfClientUtils.java
JooHyukKim Sep 4, 2023
18b2099
Merge branch 'master' into 15912-Add-command-line-option-for-configur…
tisonkun Sep 18, 2023
2502f04
fix license check
tisonkun Oct 10, 2023
dc5eed4
Merge branch 'master' into 15912-Add-command-line-option-for-configur…
tisonkun Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
properties.setProperty("memoryLimit", "10M");

String tenantName = UUID.randomUUID().toString();

Expand All @@ -94,6 +95,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-r", "30", topicName };
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
Assert.assertEquals(pulsarClientToolConsumer.rootParams.memoryLimit, 10 * 1024 * 1024);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
Expand All @@ -108,6 +110,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName };
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
Assert.assertEquals(pulsarClientToolProducer.rootParams.memoryLimit, 10 * 1024 * 1024);

future.get();
}
Expand Down Expand Up @@ -342,22 +345,49 @@ public void testArgs() throws Exception {
final String message = "test msg";
final int numberOfMessages = 1;
final String topicName = getTopicWithRandomSuffix("test-topic");
final String memoryLimitArg = "10M";

String[] args = {"--url", url,
"--auth-plugin", authPlugin,
"--auth-params", authParams,
"--tlsTrustCertsFilePath", CA_CERT_FILE_PATH,
"--memory-limit", memoryLimitArg,
"produce", "-m", message,
"-n", Integer.toString(numberOfMessages), topicName};
pulsarClientTool.jcommander.parse(args);
assertEquals(pulsarClientTool.rootParams.getTlsTrustCertsFilePath(), CA_CERT_FILE_PATH);
assertEquals(pulsarClientTool.rootParams.getAuthParams(), authParams);
assertEquals(pulsarClientTool.rootParams.getAuthPluginClassName(), authPlugin);
assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024);
assertEquals(pulsarClientTool.rootParams.getServiceURL(), url);
assertNull(pulsarClientTool.rootParams.getProxyServiceURL());
assertNull(pulsarClientTool.rootParams.getProxyProtocol());
}

@Test(timeOut = 20000)
public void testMemoryLimitArgShortName() throws Exception {
PulsarClientTool pulsarClientTool = new PulsarClientTool(new Properties());
final String url = "pulsar+ssl://localhost:6651";
final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
final String authParams = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"),
getTlsFileForClient("admin.key-pk8"));
final String message = "test msg";
final int numberOfMessages = 1;
final String topicName = getTopicWithRandomSuffix("test-topic");
final String memoryLimitArg = "10M";

String[] args = {"--url", url,
"--auth-plugin", authPlugin,
"--auth-params", authParams,
"--tlsTrustCertsFilePath", CA_CERT_FILE_PATH,
"-ml", memoryLimitArg,
"produce", "-m", message,
"-n", Integer.toString(numberOfMessages), topicName};

pulsarClientTool.jcommander.parse(args);
assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024);
}

@Test
public void testParsingProxyServiceUrlAndProxyProtocolFromProperties() throws Exception {
Properties properties = new Properties();
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-cli-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
Expand All @@ -40,7 +41,6 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.SizeUnit;


public class PulsarClientTool {

@Getter
Expand Down Expand Up @@ -76,6 +76,10 @@ public static class RootParams {

@Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates")
String tlsTrustCertsFilePath;

@Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
long memoryLimit = 0L;
}

protected RootParams rootParams;
Expand Down Expand Up @@ -151,6 +155,11 @@ protected void initRootParamsFromProperties(Properties properties) {
this.rootParams.authParams = properties.getProperty("authParams");
this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl"));
// setting memory limit
this.rootParams.memoryLimit = StringUtils.isNotEmpty(properties.getProperty("memoryLimit"))
? new ByteUnitToLongConverter("memoryLimit").convert(properties.getProperty("memoryLimit"))
: this.rootParams.memoryLimit;

String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol"));
if (proxyProtocolString != null) {
try {
Expand All @@ -165,7 +174,7 @@ protected void initRootParamsFromProperties(Properties properties) {

private void updateConfig() throws UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);
.memoryLimit(rootParams.memoryLimit, SizeUnit.BYTES);
Authentication authentication = null;
if (isNotBlank(this.rootParams.authPluginClassName)) {
authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-testclient/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-cli-utils</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -180,6 +181,10 @@ private static class MainArguments {

@Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true)
public String serviceURL;

@Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
public long memoryLimit = 0L;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one we might be able to utilize over at PR #20646

}

// Configuration class for initializing or modifying TradeUnits.
Expand Down Expand Up @@ -318,7 +323,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception {
.serviceHttpUrl(arguments.serviceURL)
.build();
client = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -66,6 +67,7 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu
throws PulsarClientException.UnsupportedAuthenticationException {

ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(arguments.maxConnections)
.ioThreads(arguments.ioThreads)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Properties;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.api.ProxyProtocol;

/**
Expand Down Expand Up @@ -103,6 +104,10 @@ public abstract class PerformanceBaseArguments {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
public String deprecatedAuthPluginClassName;

@Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
public long memoryLimit;

public abstract void fillArgumentsFromProperties(Properties prop);

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
Expand Down Expand Up @@ -229,6 +230,7 @@ public static void main(String[] args) throws Exception {
long testEndTime = startTime + (long) (arguments.testTime * 1e9);

ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction);

PulsarClient pulsarClient = clientBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -512,6 +513,7 @@ private static void runProducer(int producerId,


ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction);

client = clientBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -140,6 +141,7 @@ public static void main(String[] args) throws Exception {
};

ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTls(arguments.useTls);

PulsarClient pulsarClient = clientBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
Expand Down Expand Up @@ -223,6 +224,7 @@ public static void main(String[] args)
}

ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(!arguments.isDisableTransaction);

PulsarClient client = clientBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
*/
package org.apache.pulsar.testclient;

import static org.apache.pulsar.client.api.ProxyProtocol.SNI;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.apache.pulsar.client.api.ProxyProtocol.SNI;
import static org.testng.Assert.fail;


public class PerformanceBaseArgumentsTest {

@Test
Expand Down Expand Up @@ -158,4 +158,74 @@ public void fillArgumentsFromProperties(Properties prop) {
tempConfigFile.delete();
}
}

@DataProvider(name = "memoryLimitCliArgumentProvider")
public Object[][] memoryLimitCliArgumentProvider() {
return new Object[][] {
{ new String[]{"-ml","1"}, 1L},
{ new String[]{"-ml","1K"}, 1024L},
{ new String[]{"--memory-limit", "1G"}, 1024 * 1024 * 1024}
};
}

@Test(dataProvider = "memoryLimitCliArgumentProvider")
public void testMemoryLimitCliArgument(String[] cliArgs, long expectedMemoryLimit) {
for (String cmd : List.of(
"pulsar-perf read",
"pulsar-perf produce",
"pulsar-perf consume",
"pulsar-perf transaction"
)) {
// Arrange
AtomicBoolean called = new AtomicBoolean();
final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() {
@Override
public void fillArgumentsFromProperties(Properties prop) {
called.set(true);
}
};
baseArgument.confFile = "./src/test/resources/perf_client1.conf";

// Act
baseArgument.parseCLI(cmd, cliArgs);

// Assert
assertEquals(baseArgument.memoryLimit, expectedMemoryLimit);
}
}

@DataProvider(name = "invalidMemoryLimitCliArgumentProvider")
public Object[][] invalidMemoryLimitCliArgumentProvider() {
return new Object[][] {
{ new String[]{"-ml","-1"}},
{ new String[]{"-ml","1C"}},
{ new String[]{"--memory-limit", "1Q"}}
};
}

@Test
public void testMemoryLimitCliArgumentDefault() {
for (String cmd : List.of(
"pulsar-perf read",
"pulsar-perf produce",
"pulsar-perf consume",
"pulsar-perf transaction"
)) {
// Arrange
AtomicBoolean called = new AtomicBoolean();
final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() {
@Override
public void fillArgumentsFromProperties(Properties prop) {
called.set(true);
}
};
baseArgument.confFile = "./src/test/resources/perf_client1.conf";

// Act
baseArgument.parseCLI(cmd, new String[]{});

// Assert
assertEquals(baseArgument.memoryLimit, 0L);
}
}
}
2 changes: 1 addition & 1 deletion src/check-binary-license.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ if [ -z $TARBALL ]; then
exit 1
fi

JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort)
JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-cli-utils | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... this was it. Thank you @tisonkun ! 😭


LICENSEPATH=$(tar -tf $TARBALL | awk '/^[^\/]*\/LICENSE/')
LICENSE=$(tar -O -xf $TARBALL "$LICENSEPATH")
Expand Down