From f3dae903827fc810ef893393651c90ab291f478a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 22 Sep 2023 00:24:07 +0530 Subject: [PATCH] parses max_batch_size --- nexus/analyzer/src/lib.rs | 7 +++++++ nexus/flow-rs/src/grpc.rs | 1 + nexus/pt/src/flow_model.rs | 1 + 3 files changed, 9 insertions(+) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 60b2d5873..e1b396998 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -249,6 +249,12 @@ impl StatementAnalyzer for PeerDDLAnalyzer { _ => None, }; + let max_batch_size: Option = match raw_options.remove("max_batch_size") + { + Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + _ => None, + }; + let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), source_peer: cdc.source_peer.to_string().to_lowercase(), @@ -268,6 +274,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { replication_slot_name, push_batch_size, push_parallelism, + max_batch_size, }; // Error reporting diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index dc4bdd53e..f15b43106 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -164,6 +164,7 @@ impl FlowGrpcClient { replication_slot_name: replication_slot_name.unwrap_or_default(), push_batch_size: job.push_batch_size.unwrap_or_default(), push_parallelism: job.push_parallelism.unwrap_or_default(), + max_batch_size: job.max_batch_size.unwrap_or_default(), ..Default::default() }; diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index e407bc287..cad02e6df 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -75,6 +75,7 @@ pub struct FlowJob { pub replication_slot_name: Option, pub push_parallelism: Option, pub push_batch_size: Option, + pub max_batch_size: Option, } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]