From 3bdf024268e898661dd05bae5f6774f8d7436d92 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 22 Sep 2023 00:24:07 +0530 Subject: [PATCH] parses max_batch_size --- nexus/analyzer/src/lib.rs | 7 +++++++ nexus/flow-rs/src/grpc.rs | 1 + nexus/pt/src/flow_model.rs | 1 + 3 files changed, 9 insertions(+) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 60b2d5873c..e1b3969981 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -249,6 +249,12 @@ impl StatementAnalyzer for PeerDDLAnalyzer { _ => None, }; + let max_batch_size: Option = match raw_options.remove("max_batch_size") + { + Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + _ => None, + }; + let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), source_peer: cdc.source_peer.to_string().to_lowercase(), @@ -268,6 +274,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { replication_slot_name, push_batch_size, push_parallelism, + max_batch_size, }; // Error reporting diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index dc4bdd53ee..f15b431066 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -164,6 +164,7 @@ impl FlowGrpcClient { replication_slot_name: replication_slot_name.unwrap_or_default(), push_batch_size: job.push_batch_size.unwrap_or_default(), push_parallelism: job.push_parallelism.unwrap_or_default(), + max_batch_size: job.max_batch_size.unwrap_or_default(), ..Default::default() }; diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index e407bc287b..cad02e6dfc 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -75,6 +75,7 @@ pub struct FlowJob { pub replication_slot_name: Option, pub push_parallelism: Option, pub push_batch_size: Option, + pub max_batch_size: Option, } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]