Skip to content

Commit

Permalink
feat: support producing to specific partition (#4241)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Nov 7, 2024
1 parent 187c78b commit bbd9f6b
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 29 deletions.
12 changes: 11 additions & 1 deletion crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod cmd {
ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation,
};
use fluvio_extension_common::Terminal;
use fluvio_types::print_cli_ok;
use fluvio_types::{print_cli_ok, PartitionId};

#[cfg(feature = "producer-file-io")]
use fluvio_cli_common::user_input::{UserInputRecords, UserInputType};
Expand Down Expand Up @@ -171,6 +171,10 @@ mod cmd {
/// E.g. fluvio produce topic-name --transforms-line='{"uses":"infinyon/[email protected]","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
#[arg(long, conflicts_with_all = &["smartmodule_group", "transforms"], alias = "transform")]
pub transforms_line: Vec<String>,

/// Partition id
#[arg(short = 'p', long, value_name = "integer")]
pub partition: Option<PartitionId>,
}

fn validate_key_separator(separator: &str) -> std::result::Result<String, String> {
Expand Down Expand Up @@ -243,6 +247,12 @@ mod cmd {
let config_builder =
config_builder.smartmodules(self.smartmodule_invocations(initial_param)?);

let config_builder = if let Some(partition) = self.partition {
config_builder.set_specific_partitioner(partition)
} else {
config_builder
};

let config = config_builder
.delivery_semantic(self.delivery_semantic)
.build()
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ impl PartitionSpec {
match mirror {
PartitionMirrorConfig::Remote(remote) => {
if remote.target {
format!("{}(to-home)", external)
} else {
format!("{}(from-home)", external)
} else {
format!("{}(to-home)", external)
}
}

PartitionMirrorConfig::Home(home) => {
if home.source {
format!("{}(from-remote)", external)
} else {
format!("{}(to-remote)", external)
} else {
format!("{}(from-remote)", external)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ impl ReplicaSpec {
}
MirrorConfig::Home(home_config) => {
if home_config.0.source {
"from-remote"
} else {
"to-remote"
} else {
"from-remote"
}
}
},
Expand Down
9 changes: 9 additions & 0 deletions crates/fluvio/src/producer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use fluvio_spu_schema::Isolation;
use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;

use fluvio_compression::Compression;
use fluvio_types::PartitionId;
use serde::{Serialize, Deserialize};

use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner};

use super::partitioning::SpecificPartitioner;

const DEFAULT_LINGER_MS: u64 = 100;
const DEFAULT_TIMEOUT_MS: u64 = 1500;
const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384;
Expand Down Expand Up @@ -117,6 +120,12 @@ pub struct TopicProducerConfig {
pub(crate) smartmodules: Vec<SmartModuleInvocation>,
}

impl TopicProducerConfigBuilder {
pub fn set_specific_partitioner(self, partition_id: PartitionId) -> Self {
self.partitioner(Box::new(SpecificPartitioner::new(partition_id)))
}
}

impl TopicProducerConfig {
pub fn linger(&self) -> Duration {
self.linger
Expand Down
22 changes: 22 additions & 0 deletions crates/fluvio/src/producer/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ fn partition_siphash(key: &[u8], partition_count: PartitionCount) -> PartitionId
}
}

/// A [`Partitioner`] which assigns all records to a specific partition
pub(crate) struct SpecificPartitioner {
partition_id: PartitionId,
}

impl SpecificPartitioner {
pub fn new(partition_id: PartitionId) -> Self {
Self { partition_id }
}
}

impl Partitioner for SpecificPartitioner {
fn partition(
&self,
_config: &PartitionerConfig,
_maybe_key: Option<&[u8]>,
_value: &[u8],
) -> PartitionId {
self.partition_id
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
36 changes: 16 additions & 20 deletions tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,16 @@ setup_file() {
}


#TODO: we don't have a way to produce directly for a partition,
# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2
# @test "Can produce message to reverse mirror topic from home again" {
# run bash -c 'echo 5 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
# assert_success
# run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
# assert_success
# run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
# assert_success
# run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
# assert_success
# }
@test "Can produce message to reverse mirror topic from home again" {
run bash -c 'echo 5 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
assert_success
run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
assert_success
run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
assert_success
run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"'
assert_success
}

@test "Can switch to remote cluster 2" {
run timeout 15s "$FLUVIO_BIN" profile switch "$REMOTE_PROFILE_2"
Expand Down Expand Up @@ -250,14 +248,12 @@ setup_file() {
assert_failure
}

#TODO: we don't have a way to produce directly for a partition,
# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2
# @test "Can consume message from reverse mirror topic from remote 2" {
# sleep 5
# run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d
# assert_output 5$'\n'e$'\n'6$'\n'f
# assert_success
# }
@test "Can consume message from reverse mirror topic from remote 2" {
sleep 5
run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d
assert_output 5$'\n'e$'\n'6$'\n'f
assert_success
}

@test "Can't delete mirror topic from remote 2" {
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ setup_file() {
@test "List topics" {
run bash -c 'timeout 15s "$FLUVIO_BIN" topic list | grep "$TOPIC_NAME"'
assert_success
assert_line --partial --index 0 "$TOPIC_NAME from-remote"
assert_line --partial --index 0 "$TOPIC_NAME to-remote"
}

@test "List partitions" {
run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"'
assert_success
assert_line --partial --index 0 "$TOPIC_NAME 0 5001 $REMOTE_NAME(from-remote)"
assert_line --partial --index 0 "$TOPIC_NAME 0 5001 $REMOTE_NAME(to-remote)"
}

0 comments on commit bbd9f6b

Please sign in to comment.