Skip to content

Commit

Permalink
[improve][test][branch-2.10] Backport disabling disk usage threshold …
Browse files Browse the repository at this point in the history
…for Elastic Testcontainers (#20676)
  • Loading branch information
lhotari authored Jun 28, 2023
1 parent b355d31 commit 526e216
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
*/
package org.apache.pulsar.io.elasticsearch;

import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

@Slf4j
public class ElasticSearchTestBase {

private static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

protected static ElasticsearchContainer createElasticsearchContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("ingest.geoip.downloader.enabled", "false")
.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));


}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.tests.integration.io.sinks;

import static org.testng.Assert.assertTrue;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
Expand Down Expand Up @@ -93,7 +91,18 @@ public ElasticSearchSinkTester(boolean schemaEnable) {

@Override
protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
return new ElasticSearchContainer(cluster.getClusterName());
ElasticSearchContainer elasticsearchContainer = new ElasticSearchContainer(cluster.getClusterName());
configureElasticContainer(elasticsearchContainer);
return elasticsearchContainer;
}

protected void configureElasticContainer(ElasticSearchContainer elasticContainer) {
elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");

// allow disk to fill up beyond default 90% threshold
elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false");

elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("journalSyncData", "false")
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)
)
);
Expand Down

0 comments on commit 526e216

Please sign in to comment.