diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 3c6846330c..828547788c 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -24,7 +24,7 @@ mod cmd { ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation, }; use fluvio_extension_common::Terminal; - use fluvio_types::print_cli_ok; + use fluvio_types::{print_cli_ok, PartitionId}; #[cfg(feature = "producer-file-io")] use fluvio_cli_common::user_input::{UserInputRecords, UserInputType}; @@ -171,6 +171,10 @@ mod cmd { /// E.g. fluvio produce topic-name --transforms-line='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}' #[arg(long, conflicts_with_all = &["smartmodule_group", "transforms"], alias = "transform")] pub transforms_line: Vec, + + /// Partition id + #[arg(short = 'p', long, value_name = "integer")] + pub partition: Option, } fn validate_key_separator(separator: &str) -> std::result::Result { @@ -243,6 +247,12 @@ mod cmd { let config_builder = config_builder.smartmodules(self.smartmodule_invocations(initial_param)?); + let config_builder = if let Some(partition) = self.partition { + config_builder.set_specific_partitioner(partition) + } else { + config_builder + }; + let config = config_builder .delivery_semantic(self.delivery_semantic) .build() diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 7f5d27f9d2..591952b3fb 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -82,17 +82,17 @@ impl PartitionSpec { match mirror { PartitionMirrorConfig::Remote(remote) => { if remote.target { - format!("{}(to-home)", external) - } else { format!("{}(from-home)", external) + } else { + format!("{}(to-home)", external) } } PartitionMirrorConfig::Home(home) => { if home.source { - format!("{}(from-remote)", external) - } else { format!("{}(to-remote)", external) + } else { + format!("{}(from-remote)", external) } } } diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index ed23143bdb..6109999c42 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -285,9 +285,9 @@ impl ReplicaSpec { } MirrorConfig::Home(home_config) => { if home_config.0.source { - "from-remote" - } else { "to-remote" + } else { + "from-remote" } } }, diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index d8add9816d..0fa1849d93 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -9,10 +9,13 @@ use fluvio_spu_schema::Isolation; use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation; use fluvio_compression::Compression; +use fluvio_types::PartitionId; use serde::{Serialize, Deserialize}; use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner}; +use super::partitioning::SpecificPartitioner; + const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_TIMEOUT_MS: u64 = 1500; const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384; @@ -117,6 +120,12 @@ pub struct TopicProducerConfig { pub(crate) smartmodules: Vec, } +impl TopicProducerConfigBuilder { + pub fn set_specific_partitioner(self, partition_id: PartitionId) -> Self { + self.partitioner(Box::new(SpecificPartitioner::new(partition_id))) + } +} + impl TopicProducerConfig { pub fn linger(&self) -> Duration { self.linger diff --git a/crates/fluvio/src/producer/partitioning.rs b/crates/fluvio/src/producer/partitioning.rs index 9b0c033a30..a466febfa0 100644 --- a/crates/fluvio/src/producer/partitioning.rs +++ b/crates/fluvio/src/producer/partitioning.rs @@ -81,6 +81,28 @@ fn partition_siphash(key: &[u8], partition_count: PartitionCount) -> PartitionId } } +/// A [`Partitioner`] which assigns all records to a specific partition +pub(crate) struct SpecificPartitioner { + partition_id: PartitionId, +} + +impl SpecificPartitioner { + pub fn new(partition_id: PartitionId) -> Self { + Self { partition_id } + } +} + +impl Partitioner for SpecificPartitioner { + fn partition( + &self, + _config: &PartitionerConfig, + _maybe_key: Option<&[u8]>, + _value: &[u8], + ) -> PartitionId { + self.partition_id + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats index a2dfcbd6c0..9903ac7e56 100644 --- a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats +++ b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats @@ -195,18 +195,16 @@ setup_file() { } -#TODO: we don't have a way to produce directly for a partition, -# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2 -# @test "Can produce message to reverse mirror topic from home again" { -# run bash -c 'echo 5 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' -# assert_success -# run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' -# assert_success -# run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' -# assert_success -# run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' -# assert_success -# } +@test "Can produce message to reverse mirror topic from home again" { + run bash -c 'echo 5 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + assert_success +} @test "Can switch to remote cluster 2" { run timeout 15s "$FLUVIO_BIN" profile switch "$REMOTE_PROFILE_2" @@ -250,14 +248,12 @@ setup_file() { assert_failure } -#TODO: we don't have a way to produce directly for a partition, -# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2 -# @test "Can consume message from reverse mirror topic from remote 2" { -# sleep 5 -# run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d -# assert_output 5$'\n'e$'\n'6$'\n'f -# assert_success -# } +@test "Can consume message from reverse mirror topic from remote 2" { + sleep 5 + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d + assert_output 5$'\n'e$'\n'6$'\n'f + assert_success +} @test "Can't delete mirror topic from remote 2" { run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" diff --git a/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats b/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats index 741f208b8a..58dfa082a4 100644 --- a/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats +++ b/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats @@ -81,11 +81,11 @@ setup_file() { @test "List topics" { run bash -c 'timeout 15s "$FLUVIO_BIN" topic list | grep "$TOPIC_NAME"' assert_success - assert_line --partial --index 0 "$TOPIC_NAME from-remote" + assert_line --partial --index 0 "$TOPIC_NAME to-remote" } @test "List partitions" { run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"' assert_success - assert_line --partial --index 0 "$TOPIC_NAME 0 5001 $REMOTE_NAME(from-remote)" + assert_line --partial --index 0 "$TOPIC_NAME 0 5001 $REMOTE_NAME(to-remote)" }