Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: improve testing config #172

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
TEST_BROKER_IMPL: redpanda
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down Expand Up @@ -284,8 +285,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
TEST_BROKER_IMPL: kafka
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
```

in another session.
Expand All @@ -131,7 +131,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
Expand Down Expand Up @@ -231,14 +231,14 @@ execution that hooks right into the place where it is about to exit:
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```
Expand Down
97 changes: 64 additions & 33 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@ use rskafka::{
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};

mod test_helpers;
use test_helpers::{maybe_start_logging, now, random_topic_name, record};
use test_helpers::{maybe_start_logging, now, random_topic_name, record, TEST_TIMEOUT};

#[tokio::test]
async fn test_plain() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
}

#[tokio::test]
async fn test_topic_crud() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
let topics = client.list_topics().await.unwrap();

Expand All @@ -46,7 +52,7 @@ async fn test_topic_crud() {
.unwrap();

// might take a while to converge
tokio::time::timeout(Duration::from_millis(1_000), async {
tokio::time::timeout(TEST_TIMEOUT, async {
loop {
let topics = client.list_topics().await.unwrap();
let topic = topics.iter().find(|t| t.name == new_topic);
Expand Down Expand Up @@ -77,10 +83,13 @@ async fn test_topic_crud() {
async fn test_partition_client() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
Expand All @@ -100,13 +109,17 @@ async fn test_partition_client() {
async fn test_non_existing_partition() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();

// do NOT create the topic

// short timeout, should just check that we will never finish
tokio::time::timeout(Duration::from_millis(100), async {
client
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Retry)
Expand Down Expand Up @@ -167,8 +180,8 @@ async fn test_tls() {
.with_single_cert(vec![producer_root], private_key)
.unwrap();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection)
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.tls_config(Arc::new(config))
.build()
.await
Expand All @@ -180,14 +193,11 @@ async fn test_tls() {
async fn test_socks5() {
maybe_start_logging();

// e.g. "my-connection-kafka-bootstrap:9092"
let connection = maybe_skip_kafka_integration!();
// e.g. "localhost:1080"
let proxy = maybe_skip_SOCKS_PROXY!();
let test_cfg = maybe_skip_kafka_integration!(socks5);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection)
.socks5_proxy(proxy)
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.socks5_proxy(test_cfg.socks5_proxy.unwrap())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -222,11 +232,14 @@ async fn test_socks5() {
async fn test_produce_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -247,11 +260,14 @@ async fn test_produce_empty() {
async fn test_consume_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -274,11 +290,14 @@ async fn test_consume_empty() {
async fn test_consume_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand Down Expand Up @@ -314,11 +333,11 @@ async fn test_consume_offset_out_of_range() {
async fn test_get_offset() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 1;

let client = ClientBuilder::new(connection.clone())
let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -382,10 +401,13 @@ async fn test_get_offset() {
async fn test_produce_consume_size_cutoff() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -460,10 +482,13 @@ async fn test_produce_consume_size_cutoff() {
async fn test_consume_midbatch() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -508,10 +533,13 @@ async fn test_consume_midbatch() {
async fn test_delete_records() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!(delete);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -555,7 +583,10 @@ async fn test_delete_records() {
let offset_4 = offsets[0];

// delete from the middle of the 2nd batch
maybe_skip_delete!(partition_client, offset_3);
partition_client
.delete_records(offset_3, 1_000)
.await
.unwrap();

// fetching data before the record fails
let err = partition_client
Expand Down
Loading