Skip to content

Commit

Permalink
test: improve testing config
Browse files Browse the repository at this point in the history
Instead of having X different macros and control flow exists, let's have
one mechanism to read env variables and decide if a test shall run or
not.

This also moves the "which broker implementation supports which features"
from a bunch of environment variables into a nice enum. This will
greatly help when we expand the set of admit APIs, see #157 for example.
  • Loading branch information
crepererum committed Aug 24, 2022
1 parent aa82995 commit ab455e6
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 129 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
TEST_BROKER_IMPL: redpanda
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down Expand Up @@ -278,8 +279,7 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
TEST_BROKER_IMPL: kafka
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
```

in another session.
Expand All @@ -130,7 +130,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
Expand Down Expand Up @@ -230,14 +230,14 @@ execution that hooks right into the place where it is about to exit:
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```
Expand Down
85 changes: 56 additions & 29 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
async fn test_plain() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
}

#[tokio::test]
async fn test_topic_crud() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
let topics = client.list_topics().await.unwrap();

Expand Down Expand Up @@ -77,10 +83,13 @@ async fn test_topic_crud() {
async fn test_partition_client() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
Expand Down Expand Up @@ -134,8 +143,8 @@ async fn test_tls() {
.with_single_cert(vec![producer_root], private_key)
.unwrap();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(connection)
let test_cfg = maybe_skip_kafka_integration!();
ClientBuilder::new(test_cfg.bootstrap_brokers)
.tls_config(Arc::new(config))
.build()
.await
Expand All @@ -147,14 +156,11 @@ async fn test_tls() {
async fn test_socks5() {
maybe_start_logging();

// e.g. "my-connection-kafka-bootstrap:9092"
let connection = maybe_skip_kafka_integration!();
// e.g. "localhost:1080"
let proxy = maybe_skip_SOCKS_PROXY!();
let test_cfg = maybe_skip_kafka_integration!(socks5);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection)
.socks5_proxy(proxy)
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.socks5_proxy(test_cfg.socks5_proxy.unwrap())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -186,11 +192,14 @@ async fn test_socks5() {
async fn test_produce_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -208,11 +217,14 @@ async fn test_produce_empty() {
async fn test_consume_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -232,11 +244,14 @@ async fn test_consume_empty() {
async fn test_consume_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand Down Expand Up @@ -268,11 +283,11 @@ async fn test_consume_offset_out_of_range() {
async fn test_get_offset() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = 1;

let client = ClientBuilder::new(connection.clone())
let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -336,10 +351,13 @@ async fn test_get_offset() {
async fn test_produce_consume_size_cutoff() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -409,10 +427,13 @@ async fn test_produce_consume_size_cutoff() {
async fn test_consume_midbatch() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -454,10 +475,13 @@ async fn test_consume_midbatch() {
async fn test_delete_records() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let test_cfg = maybe_skip_kafka_integration!(delete);
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -498,7 +522,10 @@ async fn test_delete_records() {
let offset_4 = offsets[0];

// delete from the middle of the 2nd batch
maybe_skip_delete!(partition_client, offset_3);
partition_client
.delete_records(offset_3, 1_000)
.await
.unwrap();

// fetching data before the record fails
let err = partition_client
Expand Down
63 changes: 46 additions & 17 deletions tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ mod test_helpers;
async fn test_stream_consumer_start_at_0() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -72,8 +75,11 @@ async fn test_stream_consumer_start_at_0() {
async fn test_stream_consumer_start_at_1() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -111,8 +117,11 @@ async fn test_stream_consumer_start_at_1() {
async fn test_stream_consumer_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -142,8 +151,11 @@ async fn test_stream_consumer_offset_out_of_range() {
async fn test_stream_consumer_start_at_earliest() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -192,8 +204,11 @@ async fn test_stream_consumer_start_at_earliest() {
async fn test_stream_consumer_start_at_earliest_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -232,8 +247,16 @@ async fn test_stream_consumer_start_at_earliest_empty() {
async fn test_stream_consumer_start_at_earliest_after_deletion() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!(delete);
if !test_cfg.broker_impl.supports_deletes() {
println!("Skipping due to missing delete support");
return;
}

let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand All @@ -254,7 +277,7 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
.await
.unwrap();

maybe_skip_delete!(partition_client, 1);
partition_client.delete_records(1, 1_000).await.unwrap();

let mut stream =
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest)
Expand All @@ -274,8 +297,11 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
async fn test_stream_consumer_start_at_latest() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down Expand Up @@ -318,8 +344,11 @@ async fn test_stream_consumer_start_at_latest() {
async fn test_stream_consumer_start_at_latest_empty() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(connection).build().await.unwrap();
let test_cfg = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
.build()
.await
.unwrap();
let controller_client = client.controller_client().unwrap();

let topic = random_topic_name();
Expand Down
Loading

0 comments on commit ab455e6

Please sign in to comment.