diff --git a/.circleci/config.yml b/.circleci/config.yml index b8cfefc..a322acd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -204,6 +204,7 @@ jobs: RUST_LOG: "trace" # Run integration tests TEST_INTEGRATION: 1 + TEST_BROKER_IMPL: redpanda TEST_JAVA_INTEROPT: 1 # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. @@ -278,8 +279,7 @@ jobs: RUST_LOG: "trace" # Run integration tests TEST_INTEGRATION: 1 - # Kafka support DeleteRecords - TEST_DELETE_RECORDS: 1 + TEST_BROKER_IMPL: kafka TEST_JAVA_INTEROPT: 1 # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. diff --git a/README.md b/README.md index bc0b222..1dfcb31 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ $ docker-compose -f docker-compose-redpanda.yml up in one session, and then run: ```console -$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test +$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test ``` in another session. @@ -130,7 +130,7 @@ $ docker-compose -f docker-compose-kafka.yml up in one session, and then run: ```console -$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test +$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test ``` in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other @@ -230,14 +230,14 @@ execution that hooks right into the place where it is about to exit: Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with: ```console -$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features +$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features ``` If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here for the `parallel/rskafka` benchmark): ```console -$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \ +$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \ bench --all-features --bench write_throughput -- \ --bench --noplot parallel/rskafka ``` diff --git a/tests/client.rs b/tests/client.rs index dce3d44..e9b2322 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record}; async fn test_plain() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); } #[tokio::test] async fn test_topic_crud() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topics = client.list_topics().await.unwrap(); @@ -77,10 +83,13 @@ async fn test_topic_crud() { async fn test_partition_client() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client @@ -134,8 +143,8 @@ async fn test_tls() { .with_single_cert(vec![producer_root], private_key) .unwrap(); - let connection = maybe_skip_kafka_integration!(); - ClientBuilder::new(connection) + let test_cfg = maybe_skip_kafka_integration!(); + ClientBuilder::new(test_cfg.bootstrap_brokers) .tls_config(Arc::new(config)) .build() .await @@ -147,14 +156,11 @@ async fn test_tls() { async fn test_socks5() { maybe_start_logging(); - // e.g. "my-connection-kafka-bootstrap:9092" - let connection = maybe_skip_kafka_integration!(); - // e.g. "localhost:1080" - let proxy = maybe_skip_SOCKS_PROXY!(); + let test_cfg = maybe_skip_kafka_integration!(socks5); let topic_name = random_topic_name(); - let client = ClientBuilder::new(connection) - .socks5_proxy(proxy) + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .socks5_proxy(test_cfg.socks5_proxy.unwrap()) .build() .await .unwrap(); @@ -186,11 +192,14 @@ async fn test_socks5() { async fn test_produce_empty() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); let n_partitions = 2; - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, n_partitions, 1, 5_000) @@ -208,11 +217,14 @@ async fn test_produce_empty() { async fn test_consume_empty() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); let n_partitions = 2; - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, n_partitions, 1, 5_000) @@ -232,11 +244,14 @@ async fn test_consume_empty() { async fn test_consume_offset_out_of_range() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); let n_partitions = 2; - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, n_partitions, 1, 5_000) @@ -268,11 +283,11 @@ async fn test_consume_offset_out_of_range() { async fn test_get_offset() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); let n_partitions = 1; - let client = ClientBuilder::new(connection.clone()) + let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone()) .build() .await .unwrap(); @@ -336,10 +351,13 @@ async fn test_get_offset() { async fn test_produce_consume_size_cutoff() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, 1, 1, 5_000) @@ -409,10 +427,13 @@ async fn test_produce_consume_size_cutoff() { async fn test_consume_midbatch() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, 1, 1, 5_000) @@ -454,10 +475,13 @@ async fn test_consume_midbatch() { async fn test_delete_records() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(delete); let topic_name = random_topic_name(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); controller_client .create_topic(&topic_name, 1, 1, 5_000) @@ -498,7 +522,10 @@ async fn test_delete_records() { let offset_4 = offsets[0]; // delete from the middle of the 2nd batch - maybe_skip_delete!(partition_client, offset_3); + partition_client + .delete_records(offset_3, 1_000) + .await + .unwrap(); // fetching data before the record fails let err = partition_client diff --git a/tests/consumer.rs b/tests/consumer.rs index b264e4a..57ed07b 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -22,8 +22,11 @@ mod test_helpers; async fn test_stream_consumer_start_at_0() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -72,8 +75,11 @@ async fn test_stream_consumer_start_at_0() { async fn test_stream_consumer_start_at_1() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -111,8 +117,11 @@ async fn test_stream_consumer_start_at_1() { async fn test_stream_consumer_offset_out_of_range() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -142,8 +151,11 @@ async fn test_stream_consumer_offset_out_of_range() { async fn test_stream_consumer_start_at_earliest() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -192,8 +204,11 @@ async fn test_stream_consumer_start_at_earliest() { async fn test_stream_consumer_start_at_earliest_empty() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -232,8 +247,16 @@ async fn test_stream_consumer_start_at_earliest_empty() { async fn test_stream_consumer_start_at_earliest_after_deletion() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(delete); + if !test_cfg.broker_impl.supports_deletes() { + println!("Skipping due to missing delete support"); + return; + } + + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -254,7 +277,7 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() { .await .unwrap(); - maybe_skip_delete!(partition_client, 1); + partition_client.delete_records(1, 1_000).await.unwrap(); let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest) @@ -274,8 +297,11 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() { async fn test_stream_consumer_start_at_latest() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); @@ -318,8 +344,11 @@ async fn test_stream_consumer_start_at_latest() { async fn test_stream_consumer_start_at_latest_empty() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); diff --git a/tests/produce_consume.rs b/tests/produce_consume.rs index 5f5b2f6..98fc079 100644 --- a/tests/produce_consume.rs +++ b/tests/produce_consume.rs @@ -245,11 +245,11 @@ async fn assert_produce_consume( { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); + let test_cfg = maybe_skip_kafka_integration!(); let topic_name = random_topic_name(); let n_partitions = 2; - let client = ClientBuilder::new(connection.clone()) + let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone()) .build() .await .unwrap(); @@ -304,7 +304,7 @@ async fn assert_produce_consume( offsets.append( &mut f_produce( Arc::clone(&partition_client), - connection.clone(), + test_cfg.bootstrap_brokers.clone(), topic_name.clone(), 1, vec![record_1.clone(), record_2.clone()], @@ -315,7 +315,7 @@ async fn assert_produce_consume( offsets.append( &mut f_produce( Arc::clone(&partition_client), - connection.clone(), + test_cfg.bootstrap_brokers.clone(), topic_name.clone(), 1, vec![record_3.clone()], @@ -329,7 +329,14 @@ async fn assert_produce_consume( assert_ne!(offsets[2], offsets[0]); // consume - let actual = f_consume(partition_client, connection, topic_name, 1, 3).await; + let actual = f_consume( + partition_client, + test_cfg.bootstrap_brokers, + topic_name, + 1, + 3, + ) + .await; let expected: Vec<_> = offsets .into_iter() .zip([record_1, record_2, record_3]) diff --git a/tests/producer.rs b/tests/producer.rs index d87be79..bba2655 100644 --- a/tests/producer.rs +++ b/tests/producer.rs @@ -13,8 +13,11 @@ use test_helpers::{maybe_start_logging, random_topic_name, record}; async fn test_batch_producer() { maybe_start_logging(); - let connection = maybe_skip_kafka_integration!(); - let client = ClientBuilder::new(connection).build().await.unwrap(); + let test_cfg = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(test_cfg.bootstrap_brokers) + .build() + .await + .unwrap(); let controller_client = client.controller_client().unwrap(); let topic = random_topic_name(); diff --git a/tests/test_helpers.rs b/tests/test_helpers.rs index 6d793f8..6417ca8 100644 --- a/tests/test_helpers.rs +++ b/tests/test_helpers.rs @@ -3,93 +3,147 @@ use rskafka::record::Record; use std::collections::BTreeMap; use time::OffsetDateTime; -/// Get the testing Kafka connection string or return current scope. +/// Environment variable to configure if integration tests should be run. /// -/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the -/// caller. -/// -/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide -/// guidance for setting `KAFKA_CONNECTION`. -/// -/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. -#[macro_export] -macro_rules! maybe_skip_kafka_integration { - () => {{ - use std::env; +/// Accepts a boolean. +pub const ENV_TEST_INTEGRATION: &str = "TEST_INTEGRATION"; + +/// Environment variable that contains the list of bootstrap brokers. +pub const ENV_KAFKA_CONNECT: &str = "KAFKA_CONNECT"; + +/// Environment variable that determines which broker implementation we use. +pub const ENV_TEST_BROKER_IMPL: &str = "TEST_BROKER_IMPL"; + +/// Environment variable that contains the connection string for a SOCKS5 proxy that can be used for testing. +pub const ENV_SOCKS5_PROXY: &str = "SOCKS5_PROXY"; + +/// Broker implementation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BrokerImpl { + Kafka, + Redpanda, +} + +impl BrokerImpl { + #[allow(dead_code)] + pub fn supports_deletes(&self) -> bool { + match self { + BrokerImpl::Kafka => true, + // See https://github.com/redpanda-data/redpanda/issues/2648 + BrokerImpl::Redpanda => false, + } + } +} + +/// Test config. +#[derive(Debug)] +pub struct TestConfig { + pub bootstrap_brokers: Vec, + pub broker_impl: BrokerImpl, + pub socks5_proxy: Option, +} + +impl TestConfig { + /// Get test config from environment. + pub fn from_env() -> Option { dotenvy::dotenv().ok(); - match ( - env::var("TEST_INTEGRATION").is_ok(), - env::var("KAFKA_CONNECT").ok(), - ) { - (true, Some(kafka_connection)) => { - let kafka_connection: Vec = - kafka_connection.split(",").map(|s| s.to_owned()).collect(); - kafka_connection + match std::env::var(ENV_TEST_INTEGRATION) + .ok() + .map(|s| parse_as_bool(&s)) + { + None | Some(Ok(false)) => { + return None; } - (true, None) => { - panic!( - "TEST_INTEGRATION is set which requires running integration tests, but \ - KAFKA_CONNECT is not set. Please run Kafka or Redpanda then \ - set KAFKA_CONNECT as directed in README.md." - ) - } - (false, Some(_)) => { - eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); - return; - } - (false, None) => { - eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" - ); - return; + Some(Ok(true)) => {} + Some(Err(s)) => { + panic!("Invalid value for {ENV_TEST_INTEGRATION}: {s}") } } - }}; + + let bootstrap_brokers = std::env::var(ENV_KAFKA_CONNECT) + .ok() + .unwrap_or_else(|| panic!("{ENV_KAFKA_CONNECT} not set, please read README")) + .split(',') + .map(|s| s.trim().to_owned()) + .collect(); + + let broker_impl = std::env::var(ENV_TEST_BROKER_IMPL) + .ok() + .unwrap_or_else(|| panic!("{ENV_TEST_BROKER_IMPL} is required to determine the broker implementation (e.g. kafka, redpanda)")) + .to_lowercase(); + let broker_impl = match broker_impl.as_str() { + "kafka" => BrokerImpl::Kafka, + "redpanda" => BrokerImpl::Redpanda, + other => panic!("Invalid {ENV_TEST_BROKER_IMPL}: {other}"), + }; + + let socks5_proxy = std::env::var(ENV_SOCKS5_PROXY).ok(); + + Some(Self { + bootstrap_brokers, + broker_impl, + socks5_proxy, + }) + } } -/// Performs delete operation using. -/// -/// This is skipped (via `return`) if the broker returns `NoVersionMatch`, except when the `TEST_DELETE_RECORDS` -/// environment variable is set. -/// -/// This is helpful because Redpanda does not support deletes yet, see -/// but we also don not want to skip these test unconditionally. -#[macro_export] -macro_rules! maybe_skip_delete { - ($partition_client:ident, $offset:expr) => { - match $partition_client.delete_records($offset, 1_000).await { - Ok(()) => {} - Err(rskafka::client::error::Error::Request( - rskafka::client::error::RequestError::NoVersionMatch { .. }, - )) if std::env::var("TEST_DELETE_RECORDS").is_err() => { - println!("Skip test_delete_records"); - return; - } - Err(e) => panic!("Cannot delete: {e}"), - } - }; +/// Parse string as boolean variable. +fn parse_as_bool(s: &str) -> Result { + let s_lower = s.to_lowercase(); + + match s_lower.as_str() { + "0" | "false" | "f" | "no" | "n" => Ok(false), + "1" | "true" | "t" | "yes" | "y" => Ok(true), + _ => Err(s.to_owned()), + } } -/// Get the Socks Proxy environment variable. +/// Get [`TestConfig`] or exit test (by returning). +/// +/// Takes an optional list of capabilities that are needed to run the test. These are: /// -/// If `SOCKS_PROXY` is not set, fail the tests and provide -/// guidance for setting `SOCKS_PROXY`. +/// - `delete`: the broker implementation must support deletes +/// - `socks5`: a SOCKS5 proxy is available for tests #[macro_export] -macro_rules! maybe_skip_SOCKS_PROXY { +macro_rules! maybe_skip_kafka_integration { () => {{ - use std::env; - dotenvy::dotenv().ok(); - - match (env::var("SOCKS_PROXY").ok()) { - Some(proxy) => proxy, - _ => { - eprintln!("skipping integration tests with Proxy - set SOCKS_PROXY to run"); + match test_helpers::TestConfig::from_env() { + Some(cfg) => cfg, + None => { + eprintln!( + "skipping Kafka integration tests - set {} to run", + test_helpers::ENV_KAFKA_CONNECT + ); return; } } }}; + ($cap:ident) => { + $crate::maybe_skip_kafka_integration!($cap,) + }; + ($cap:ident, $($other:ident),*,) => { + $crate::maybe_skip_kafka_integration!($cap, $($other:ident),*) + }; + (delete, $($other:ident),*) => {{ + let cfg = $crate::maybe_skip_kafka_integration!($($other),*); + if !cfg.broker_impl.supports_deletes() { + eprintln!("Skipping due to missing delete support"); + return; + } + cfg + }}; + (socks5, $($other:ident),*) => {{ + let cfg = $crate::maybe_skip_kafka_integration!($($other),*); + if cfg.socks5_proxy.is_none() { + eprintln!("skipping integration tests with Proxy - set {} to run", test_helpers::ENV_SOCKS5_PROXY); + return; + } + cfg + }}; + ($cap:ident, $($other:ident),*) => { + compile_error!(concat!("invalid capability: ", stringify!($cap))) + }; } /// Generated random topic name for testing.