Skip to content

Commit

Permalink
feat: basic support to reassign partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Jul 21, 2022
1 parent 624bdf9 commit 66791ac
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ use crate::{
protocol::{
error::Error as ProtocolError,
messages::{
CreateTopicRequest, CreateTopicsRequest, ElectLeadersRequest, ElectLeadersTopicRequest,
AlterPartitionReassignmentsPartitionRequest, AlterPartitionReassignmentsRequest,
AlterPartitionReassignmentsTopicRequest, CreateTopicRequest, CreateTopicsRequest,
ElectLeadersRequest, ElectLeadersTopicRequest,
},
primitives::{
Array, CompactArray, CompactString, Int16, Int32, Int8, NullableString, String_,
TaggedFields,
},
primitives::{Array, Int16, Int32, Int8, NullableString, String_},
},
validation::ExactlyOne,
};
Expand Down Expand Up @@ -93,6 +98,62 @@ impl ControllerClient {
.await
}

/// Re-assign partitions.
pub async fn reassign_partitions(
&self,
topic: impl Into<String> + Send,
partition: i32,
replicas: Vec<i32>,
timeout_ms: i32,
) -> Result<()> {
let request = &AlterPartitionReassignmentsRequest {
topics: vec![AlterPartitionReassignmentsTopicRequest {
name: CompactString(topic.into()),
partitions: vec![AlterPartitionReassignmentsPartitionRequest {
partition_index: Int32(partition),
replicas: CompactArray(Some(replicas.into_iter().map(Int32).collect())),
tagged_fields: TaggedFields::default(),
}],
tagged_fields: TaggedFields::default(),
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: TaggedFields::default(),
};

maybe_retry(
&self.backoff_config,
self,
"reassign_partitions",
|| async move {
let broker = self.get().await?;
let response = broker.request(request).await?;

if let Some(protocol_error) = response.error {
return Err(Error::ServerError(protocol_error, Default::default()));
}

let topic = response
.responses
.exactly_one()
.map_err(Error::exactly_one_topic)?;

let partition = topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;

match partition.error {
None => Ok(()),
Some(protocol_error) => Err(Error::ServerError(
protocol_error,
partition.error_message.0.unwrap_or_default(),
)),
}
},
)
.await
}

/// Elect leaders for given topic and partition.
pub async fn elect_leaders(
&self,
Expand Down
234 changes: 234 additions & 0 deletions src/protocol/messages/alter_partition_reassignments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::io::{Read, Write};

use crate::protocol::{
api_key::ApiKey,
api_version::{ApiVersion, ApiVersionRange},
error::Error,
messages::{read_compact_versioned_array, write_compact_versioned_array},
primitives::{CompactArray, CompactNullableString, CompactString, Int16, Int32, TaggedFields},
traits::{ReadType, WriteType},
};

use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};

#[derive(Debug)]
pub struct AlterPartitionReassignmentsRequest {
/// The time in ms to wait for the request to complete.
pub timeout_ms: Int32,

/// The topics to reassign.
pub topics: Vec<AlterPartitionReassignmentsTopicRequest>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.timeout_ms.write(writer)?;
write_compact_versioned_array(writer, version, Some(&self.topics))?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

impl RequestBody for AlterPartitionReassignmentsRequest {
type ResponseBody = AlterPartitionReassignmentsResponse;

const API_KEY: ApiKey = ApiKey::AlterPartitionReassignments;

/// All versions.
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(0)));

const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(0));
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsTopicRequest {
/// The topic name.
pub name: CompactString,

/// The partitions to reassign.
pub partitions: Vec<AlterPartitionReassignmentsPartitionRequest>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsTopicRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.name.write(writer)?;
write_compact_versioned_array(writer, version, Some(&self.partitions))?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsPartitionRequest {
/// The partition index.
pub partition_index: Int32,

/// The replicas to place the partitions on, or null to cancel a pending reassignment for this partition.
pub replicas: CompactArray<Int32>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsPartitionRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.partition_index.write(writer)?;
self.replicas.write(writer)?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsResponse {
/// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
/// request did not violate any quota.
pub throttle_time_ms: Int32,

/// The top-level error code, or 0 if there was no error.
pub error: Option<Error>,

/// The top-level error message, or null if there was no error.
pub error_message: CompactNullableString,

/// The responses to topics to reassign.
pub responses: Vec<AlterPartitionReassignmentsTopicResponse>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let throttle_time_ms = Int32::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let error_message = CompactNullableString::read(reader)?;
let responses = read_compact_versioned_array(reader, version)?.unwrap_or_default();
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
throttle_time_ms,
error,
error_message,
responses,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsTopicResponse {
/// The topic name
pub name: CompactString,

/// The responses to partitions to reassign
pub partitions: Vec<AlterPartitionReassignmentsPartitionResponse>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsTopicResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let name = CompactString::read(reader)?;
let partitions = read_compact_versioned_array(reader, version)?.unwrap_or_default();
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
name,
partitions,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsPartitionResponse {
/// The partition index.
pub partition_index: Int32,

/// The error code for this partition, or 0 if there was no error.
pub error: Option<Error>,

/// The error message for this partition, or null if there was no error.
pub error_message: CompactNullableString,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsPartitionResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let partition_index = Int32::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let error_message = CompactNullableString::read(reader)?;
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
partition_index,
error,
error_message,
tagged_fields,
})
}
}
2 changes: 2 additions & 0 deletions src/protocol/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use super::{
vec_builder::VecBuilder,
};

mod alter_partition_reassignments;
pub use alter_partition_reassignments::*;
mod api_versions;
pub use api_versions::*;
mod constants;
Expand Down
21 changes: 21 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,27 @@ async fn test_delete_records() {
);
}

#[tokio::test]
async fn test_reassign_partitions() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

controller_client
.reassign_partitions(&topic_name, 0, vec![0, 1], 5_000)
.await
.unwrap();
}

#[tokio::test]
async fn test_elect_leaders() {
maybe_start_logging();
Expand Down

0 comments on commit 66791ac

Please sign in to comment.