Skip to content

Commit

Permalink
test: improve testing config
Browse files Browse the repository at this point in the history
Instead of having X different macros and control flow exists, let's have
one mechanism to read env variables and decide if a test shall run or
not.

This also moves the "which broker implementation supports which features"
from a bunch of environment variables into a nice enum. This will
greatly help when we expand the set of admit APIs, see #157 for example.
  • Loading branch information
crepererum committed Aug 24, 2022
1 parent 2f855b2 commit 6565321
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 131 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
TEST_BROKER_IMPL: redpanda
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down Expand Up @@ -284,8 +285,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
TEST_BROKER_IMPL: kafka
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
```

in another session.
Expand All @@ -131,7 +131,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
Expand Down Expand Up @@ -231,14 +231,14 @@ execution that hooks right into the place where it is about to exit:
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```
Expand Down
92 changes: 61 additions & 31 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
async fn test_plain() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
}

#[tokio::test]
async fn test_topic_crud() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
let topics = client.list_topics().await.unwrap();

Expand Down Expand Up @@ -77,10 +83,13 @@ async fn test_topic_crud() {
async fn test_partition_client() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
Expand All @@ -100,10 +109,13 @@ async fn test_partition_client() {
async fn test_non_existing_partition() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();

// do NOT create the topic

Expand Down Expand Up @@ -167,8 +179,8 @@ async fn test_tls() {
.with_single_cert(vec![producer_root], private_key)
.unwrap();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection)
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.tls_config(Arc::new(config))
.build()
.await
Expand All @@ -180,14 +192,11 @@ async fn test_tls() {
async fn test_socks5() {
maybe_start_logging();

// e.g. "my-connection-kafka-bootstrap:9092"
let connection = maybe_skip_kafka_integration!();
// e.g. "localhost:1080"
let proxy = maybe_skip_SOCKS_PROXY!();
let test_cfg = maybe_skip_kafka_integration!(socks5);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection)
.socks5_proxy(proxy)
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.socks5_proxy(test_cfg.socks5_proxy.unwrap())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -222,11 +231,14 @@ async fn test_socks5() {
async fn test_produce_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -247,11 +259,14 @@ async fn test_produce_empty() {
async fn test_consume_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -274,11 +289,14 @@ async fn test_consume_empty() {
async fn test_consume_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand Down Expand Up @@ -314,11 +332,11 @@ async fn test_consume_offset_out_of_range() {
async fn test_get_offset() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 1;

let client = ClientBuilder::new(connection.clone())
let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -382,10 +400,13 @@ async fn test_get_offset() {
async fn test_produce_consume_size_cutoff() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -460,10 +481,13 @@ async fn test_produce_consume_size_cutoff() {
async fn test_consume_midbatch() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -508,10 +532,13 @@ async fn test_consume_midbatch() {
async fn test_delete_records() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!(delete);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -555,7 +582,10 @@ async fn test_delete_records() {
let offset_4 = offsets[0];

// delete from the middle of the 2nd batch
maybe_skip_delete!(partition_client, offset_3);
partition_client
.delete_records(offset_3, 1_000)
.await
.unwrap();

// fetching data before the record fails
let err = partition_client
Expand Down
63 changes: 46 additions & 17 deletions tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ mod test_helpers;
async fn test_stream_consumer_start_at_0() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -77,8 +80,11 @@ async fn test_stream_consumer_start_at_0() {
async fn test_stream_consumer_start_at_1() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -121,8 +127,11 @@ async fn test_stream_consumer_start_at_1() {
async fn test_stream_consumer_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -157,8 +166,11 @@ async fn test_stream_consumer_offset_out_of_range() {
async fn test_stream_consumer_start_at_earliest() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -212,8 +224,11 @@ async fn test_stream_consumer_start_at_earliest() {
async fn test_stream_consumer_start_at_earliest_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -257,8 +272,16 @@ async fn test_stream_consumer_start_at_earliest_empty() {
async fn test_stream_consumer_start_at_earliest_after_deletion() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!(delete);
if !test_cfg.broker_impl.supports_deletes() {
println!("Skipping due to missing delete support");
return;
}

let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand All @@ -284,7 +307,7 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
.await
.unwrap();

maybe_skip_delete!(partition_client, 1);
partition_client.delete_records(1, 1_000).await.unwrap();

let mut stream =
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest)
Expand All @@ -304,8 +327,11 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
async fn test_stream_consumer_start_at_latest() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -353,8 +379,11 @@ async fn test_stream_consumer_start_at_latest() {
async fn test_stream_consumer_start_at_latest_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down
Loading

0 comments on commit 6565321

Please sign in to comment.