Skip to content

Commit

Permalink
Update to shotover 0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jul 22, 2024
1 parent f720b9f commit 5c4dd9d
Show file tree
Hide file tree
Showing 10 changed files with 955 additions and 705 deletions.
1,548 changes: 882 additions & 666 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1

[workspace.dependencies]
shotover = { version = "0.2.0", features = ["alpha-transforms"] }
shotover = { version = "0.4.0" }
anyhow = "1.0.42"
serde = { version = "1.0.111", features = ["derive"] }
tracing = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion kafka-fetch-rewrite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ async-trait.workspace = true
tracing.workspace = true
typetag.workspace = true
bytes = "1.4.0"
kafka-protocol = "0.7.0"
kafka-protocol = "0.10.0"
31 changes: 25 additions & 6 deletions kafka-fetch-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,37 @@ use serde::{Deserialize, Serialize};
use shotover::frame::kafka::{KafkaFrame, ResponseBody};
use shotover::frame::Frame;
use shotover::message::Messages;
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use shotover::transforms::{
DownChainProtocol, Transform, TransformBuilder, TransformConfig, TransformContextBuilder,
TransformContextConfig, UpChainProtocol, Wrapper,
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct KafkaFetchRewriteConfig {
pub result: String,
}

const NAME: &str = "KafkaFetchRewrite";
#[typetag::serde(name = "KafkaFetchRewrite")]
#[async_trait(?Send)]
impl TransformConfig for KafkaFetchRewriteConfig {
async fn get_builder(&self, _chain_name: String) -> Result<Box<dyn TransformBuilder>> {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(KafkaFetchRewriteBuilder {
result: self.result.clone(),
}))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
UpChainProtocol::Any
}

fn down_chain_protocol(&self) -> DownChainProtocol {
DownChainProtocol::Terminating
}
}

#[derive(Clone)]
Expand All @@ -32,14 +47,14 @@ pub struct KafkaFetchRewriteBuilder {
}

impl TransformBuilder for KafkaFetchRewriteBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(Box::new(KafkaFetchRewrite {
fn build(&self, _transform_context: TransformContextBuilder) -> Box<dyn Transform> {
Box::new(KafkaFetchRewrite {
result: self.result.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"KafkaFetchRewrite"
NAME
}
}

Expand All @@ -49,6 +64,10 @@ pub struct KafkaFetchRewrite {

#[async_trait]
impl Transform for KafkaFetchRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> Result<Messages> {
let mut responses = message_wrapper.call_next_transform().await?;

Expand Down
31 changes: 25 additions & 6 deletions redis-get-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,37 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use shotover::frame::{Frame, RedisFrame};
use shotover::message::Messages;
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use shotover::transforms::{
DownChainProtocol, Transform, TransformBuilder, TransformConfig, TransformContextBuilder,
TransformContextConfig, UpChainProtocol, Wrapper,
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct RedisGetRewriteConfig {
pub result: String,
}

const NAME: &str = "RedisGetRewrite";
#[typetag::serde(name = "RedisGetRewrite")]
#[async_trait(?Send)]
impl TransformConfig for RedisGetRewriteConfig {
async fn get_builder(&self, _chain_name: String) -> Result<Box<dyn TransformBuilder>> {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(RedisGetRewriteBuilder {
result: self.result.clone(),
}))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
UpChainProtocol::Any
}

fn down_chain_protocol(&self) -> DownChainProtocol {
DownChainProtocol::Terminating
}
}

#[derive(Clone)]
Expand All @@ -27,14 +42,14 @@ pub struct RedisGetRewriteBuilder {
}

impl TransformBuilder for RedisGetRewriteBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(Box::new(RedisGetRewrite {
fn build(&self, _transform_context: TransformContextBuilder) -> Box<dyn Transform> {
Box::new(RedisGetRewrite {
result: self.result.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"RedisGetRewrite"
NAME
}
}

Expand All @@ -44,6 +59,10 @@ pub struct RedisGetRewrite {

#[async_trait]
impl Transform for RedisGetRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> Result<Messages> {
let mut get_indices = vec![];
for (i, message) in message_wrapper.requests.iter_mut().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.74"
channel = "1.79"
components = [ "rustfmt", "clippy" ]
8 changes: 4 additions & 4 deletions shotover-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ redis-get-rewrite = { path = "../redis-get-rewrite" }
kafka-fetch-rewrite = { path = "../kafka-fetch-rewrite" }

[dev-dependencies]
tokio-bin-process = "0.4.0"
docker-compose-runner = "0.2.0"
tokio-bin-process = "0.5.0"
docker-compose-runner = "0.3.0"
tokio = { version = "1.28.0", features = ["full", "macros"] }
redis = { version = "0.23.0", features = ["tokio-comp", "cluster"] }
rdkafka = "0.34.0"
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
rdkafka = "0.36.0"
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.4.0-debian-11-r22'
Expand Down
5 changes: 2 additions & 3 deletions shotover-bin/redis-get-rewrite-config/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
version: "3.3"
services:
redis-one:
image: library/redis:5.0.9
ports:
- "1111:6379"
volumes:
- ./redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
- ./redis.conf:/usr/local/etc/redis/redis.conf
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
30 changes: 14 additions & 16 deletions shotover-bin/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ use tokio_bin_process::event_matcher::EventMatcher;
use tokio_bin_process::{bin_path, BinProcess};

fn docker_compose(yaml_path: &str) -> DockerCompose {
DockerCompose::new(
&[
Image {
name: "library/redis:5.0.9",
log_regex_to_wait_for: r"Ready to accept connections",
timeout: 120,
},
Image {
name: "bitnami/kafka:3.4.0-debian-11-r22",
log_regex_to_wait_for: r"Kafka Server started",
timeout: 120,
},
],
|_| {},
yaml_path,
)
DockerCompose::new(&IMAGE_WAITERS, |_| {}, yaml_path)
}

pub static IMAGE_WAITERS: [Image; 2] = [
Image {
name: "library/redis:5.0.9",
log_regex_to_wait_for: r"Ready to accept connections",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.4.0-debian-11-r22",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
];

async fn shotover(topology_path: &str) -> BinProcess {
let mut shotover = BinProcess::start_binary(
bin_path!("shotover-bin"),
Expand Down

0 comments on commit 5c4dd9d

Please sign in to comment.