diff --git a/Cargo.lock b/Cargo.lock index 2f767d12de213..275024641e22e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10463,6 +10463,7 @@ dependencies = [ "async-recursion", "async-trait", "bytes", + "chrono", "criterion", "either", "foyer", @@ -10476,6 +10477,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", + "mysql_async", "opendal 0.49.2", "parking_lot 0.12.1", "parquet 53.0.0", @@ -10493,6 +10495,7 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "risingwave_storage", + "rust_decimal", "rw_futures_util", "scopeguard", "serde_json", @@ -11346,6 +11349,7 @@ dependencies = [ "maplit", "md5", "memcomparable", + "mysql_async", "num-integer", "parking_lot 0.12.1", "parse-display", diff --git a/Cargo.toml b/Cargo.toml index 80166c384efb5..49e421f4d8219 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,9 @@ deltalake = { version = "0.20.1", features = [ itertools = "0.13.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } +mysql_async = { version = "0.34", default-features = false, features = [ + "default", +] } parquet = { version = "53", features = ["async"] } thiserror-ext = "0.1.2" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } diff --git a/e2e_test/source_inline/tvf/mysql_query.slt b/e2e_test/source_inline/tvf/mysql_query.slt new file mode 100644 index 0000000000000..56acf0598244c --- /dev/null +++ b/e2e_test/source_inline/tvf/mysql_query.slt @@ -0,0 +1,73 @@ +control substitution on + +system ok +mysql -e "DROP DATABASE IF EXISTS tvf; CREATE DATABASE tvf;" + +system ok +mysql -e " +USE tvf; +CREATE TABLE test ( + id bigint primary key, + v0 bit, + v1 bool, + v2 tinyint(1), + v3 tinyint(2), + v4 smallint, + v5 mediumint, + v6 integer, + v7 bigint, + v8 float, + v9 double, + v10 numeric(4, 2), + v11 decimal(4, 2), + v12 char(255), + v13 varchar(255), + v14 bit(10), + v15 tinyblob, + v16 blob, + v17 mediumblob, + v18 longblob, + v19 date, + v20 time, + v21 timestamp, + v22 json, + v23 int +); +INSERT INTO test SELECT + 1 as id, + true as v0, + true as v1, + 2 as v2, + 3 as v3, + 4 as v4, + 5 as v5, + 6 as v6, + 7 as v7, + 1.08 as v8, + 1.09 as v9, + 1.10 as v10, + 1.11 as v11, + 'char' as v12, + 'varchar' as v13, + b'1010' as v14, + x'16' as v15, + x'17' as v16, + x'18' as v17, + x'19' as v18, + '2021-01-01' as v19, + '12:34:56' as v20, + '2021-01-01 12:34:56' as v21, + JSON_OBJECT('key1', 1, 'key2', 'abc') as v22, + null as v23; +" + +query +select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;'); +---- +1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL + +system ok +mysql -e " +USE tvf; +DROP DATABASE tvf; +" \ No newline at end of file diff --git a/e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt b/e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt index 08afa5d1988a7..e1b5016774a0c 100644 --- a/e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt +++ b/e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt @@ -174,8 +174,8 @@ SELECT c_binary_255 FROM rw_mysql_types_test order by c_boolean; ---- -0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 -1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 +0 f NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 +1 t -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 query TTTTTTTT SELECT diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index f881f6546fae5..b46230b2438d6 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -104,6 +104,17 @@ message PostgresQueryNode { string query = 7; } +// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. +message MySqlQueryNode { + repeated plan_common.ColumnDesc columns = 1; + string hostname = 2; + string port = 3; + string username = 4; + string password = 5; + string database = 6; + string query = 7; +} + message ProjectNode { repeated expr.ExprNode select_list = 1; } @@ -386,6 +397,7 @@ message PlanNode { FileScanNode file_scan = 38; IcebergScanNode iceberg_scan = 39; PostgresQueryNode postgres_query = 40; + MySqlQueryNode mysql_query = 41; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/proto/expr.proto b/proto/expr.proto index 43e2002933a47..5330843512849 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -365,6 +365,8 @@ message TableFunction { FILE_SCAN = 19; // postgres query POSTGRES_QUERY = 20; + // mysql query + MYSQL_QUERY = 21; // User defined table function USER_DEFINED = 100; } diff --git a/risedev.yml b/risedev.yml index 0ec9b8b09af79..8e3668dcb49c2 100644 --- a/risedev.yml +++ b/risedev.yml @@ -959,6 +959,25 @@ profile: address: schemaregistry port: 8082 + local-inline-source-test: + config-path: src/config/ci-recovery.toml + steps: + - use: minio + - use: sqlite + - use: meta-node + meta-backend: sqlite + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + - use: pubsub + persist-data: true + - use: kafka + persist-data: true + - use: schema-registry + - use: mysql + - use: postgres + ci-inline-source-test: config-path: src/config/ci-recovery.toml steps: diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 46c4aa7b9de6e..ee6f757e17376 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -19,6 +19,7 @@ assert_matches = "1" async-recursion = "1" async-trait = "0.1" bytes = "1" +chrono = "0.4" either = "1" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } @@ -29,6 +30,7 @@ hytra = "0.1.2" iceberg = { workspace = true } itertools = { workspace = true } memcomparable = "0.2" +mysql_async = { workspace = true } opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } @@ -45,6 +47,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } +rust_decimal = "1" rw_futures_util = { workspace = true } scopeguard = "1" serde_json = "1" diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index e3e53ee449fac..b0723277a6f5e 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -17,6 +17,8 @@ use std::sync::Arc; pub use anyhow::anyhow; +use iceberg::Error as IcebergError; +use mysql_async::Error as MySqlError; use parquet::errors::ParquetError; use risingwave_common::array::ArrayError; use risingwave_common::error::{def_anyhow_newtype, def_anyhow_variant, BoxedError}; @@ -29,7 +31,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus}; use risingwave_storage::error::StorageError; use thiserror::Error; use thiserror_ext::Construct; -use tokio_postgres; +use tokio_postgres::Error as PostgresError; use tonic::Status; use crate::worker_manager::worker_node_manager::FragmentId; @@ -192,7 +194,8 @@ def_anyhow_variant! { pub BatchExternalSystemError, BatchError ExternalSystemError, - tokio_postgres::Error => "Postgres error", - iceberg::Error => "Iceberg error", + PostgresError => "Postgres error", + IcebergError => "Iceberg error", ParquetError => "Parquet error", + MySqlError => "MySQL error", } diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index c3bd373198df7..ce84065d9d41c 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -29,6 +29,7 @@ mod managed; mod max_one_row; mod merge_sort; mod merge_sort_exchange; +mod mysql_query; mod order_by; mod postgres_query; mod project; @@ -65,6 +66,7 @@ pub use managed::*; pub use max_one_row::*; pub use merge_sort::*; pub use merge_sort_exchange::*; +pub use mysql_query::*; pub use order_by::*; pub use postgres_query::*; pub use project::*; @@ -247,6 +249,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { NodeBody::FileScan => FileScanExecutorBuilder, NodeBody::IcebergScan => IcebergScanExecutorBuilder, NodeBody::PostgresQuery => PostgresQueryExecutorBuilder, + NodeBody::MysqlQuery => MySqlQueryExecutorBuilder, // Follow NodeBody only used for test NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, diff --git a/src/batch/src/executor/mysql_query.rs b/src/batch/src/executor/mysql_query.rs new file mode 100644 index 0000000000000..721c9c5e55bf1 --- /dev/null +++ b/src/batch/src/executor/mysql_query.rs @@ -0,0 +1,170 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use mysql_async; +use mysql_async::prelude::*; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::OwnedRow; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_connector::parser::mysql_datum_to_rw_datum; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::{BatchError, BatchExternalSystemError}; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; +use crate::task::BatchTaskContext; + +/// `MySqlQuery` executor. Runs a query against a `MySql` database. +pub struct MySqlQueryExecutor { + schema: Schema, + host: String, + port: String, + username: String, + password: String, + database: String, + query: String, + identity: String, + chunk_size: usize, +} + +impl Executor for MySqlQueryExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} +pub fn mysql_row_to_owned_row( + mut row: mysql_async::Row, + schema: &Schema, +) -> Result { + let mut datums = vec![]; + for i in 0..schema.fields.len() { + let rw_field = &schema.fields[i]; + let name = rw_field.name.as_str(); + let datum = match mysql_datum_to_rw_datum(&mut row, i, name, &rw_field.data_type) { + Ok(val) => val, + Err(e) => { + let e = BatchExternalSystemError(e); + return Err(e.into()); + } + }; + datums.push(datum); + } + Ok(OwnedRow::new(datums)) +} + +impl MySqlQueryExecutor { + pub fn new( + schema: Schema, + host: String, + port: String, + username: String, + password: String, + database: String, + query: String, + identity: String, + chunk_size: usize, + ) -> Self { + Self { + schema, + host, + port, + username, + password, + database, + query, + identity, + chunk_size, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + tracing::debug!("mysql_query_executor: started"); + let database_opts: mysql_async::Opts = mysql_async::OptsBuilder::default() + .ip_or_hostname(self.host) + .tcp_port(self.port.parse::().unwrap()) // FIXME + .user(Some(self.username)) + .pass(Some(self.password)) + .db_name(Some(self.database)) + .into(); + + let pool = mysql_async::Pool::new(database_opts); + let mut conn = pool + .get_conn() + .await + .context("failed to connect to mysql in batch executor")?; + + let query = self.query; + let mut query_iter = conn + .query_iter(query) + .await + .context("failed to execute my_sql_query in batch executor")?; + let Some(row_stream) = query_iter.stream::().await? else { + bail!("failed to get row stream from mysql query") + }; + + let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size); + tracing::debug!("mysql_query_executor: query executed, start deserializing rows"); + // deserialize the rows + #[for_await] + for row in row_stream { + let row = row?; + let owned_row = mysql_row_to_owned_row(row, &self.schema)?; + if let Some(chunk) = builder.append_one_row(owned_row) { + yield chunk; + } + } + if let Some(chunk) = builder.consume_all() { + yield chunk; + } + return Ok(()); + } +} + +pub struct MySqlQueryExecutorBuilder {} + +#[async_trait::async_trait] +impl BoxedExecutorBuilder for MySqlQueryExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_, C>, + _inputs: Vec, + ) -> crate::error::Result { + let mysql_query_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::MysqlQuery + )?; + + Ok(Box::new(MySqlQueryExecutor::new( + Schema::from_iter(mysql_query_node.columns.iter().map(Field::from)), + mysql_query_node.hostname.clone(), + mysql_query_node.port.clone(), + mysql_query_node.username.clone(), + mysql_query_node.password.clone(), + mysql_query_node.database.clone(), + mysql_query_node.query.clone(), + source.plan_node().get_identity().clone(), + source.context.get_config().developer.chunk_size, + ))) + } +} diff --git a/src/batch/src/executor/postgres_query.rs b/src/batch/src/executor/postgres_query.rs index 2b6524a2e45e7..4ae1fcba65da9 100644 --- a/src/batch/src/executor/postgres_query.rs +++ b/src/batch/src/executor/postgres_query.rs @@ -37,6 +37,7 @@ pub struct PostgresQueryExecutor { database: String, query: String, identity: String, + chunk_size: usize, } impl Executor for PostgresQueryExecutor { @@ -115,6 +116,7 @@ impl PostgresQueryExecutor { database: String, query: String, identity: String, + chunk_size: usize, ) -> Self { Self { schema, @@ -125,6 +127,7 @@ impl PostgresQueryExecutor { database, query, identity, + chunk_size, } } @@ -151,7 +154,7 @@ impl PostgresQueryExecutor { .query_raw(&self.query, params) .await .context("postgres_query received error from remote server")?; - let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024); + let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size); tracing::debug!("postgres_query_executor: query executed, start deserializing rows"); // deserialize the rows #[for_await] @@ -191,6 +194,7 @@ impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder { postgres_query_node.database.clone(), postgres_query_node.query.clone(), source.plan_node().get_identity().clone(), + source.context.get_config().developer.chunk_size, ))) } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index b47076c645949..b44aba68b0985 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -91,6 +91,7 @@ pub mod test_utils; pub mod transaction; pub mod types; pub mod vnode_mapping; + pub mod test_prelude { pub use super::array::{DataChunkTestExt, StreamChunkTestExt}; pub use super::catalog::test_utils::ColumnDescTestExt; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 9fd49fea88c7b..44fb2d7ba840f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -73,9 +73,7 @@ jsonwebtoken = "9.2.0" maplit = "1.0.2" moka = { version = "0.12.0", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } -mysql_async = { version = "0.34", default-features = false, features = [ - "default", -] } +mysql_async = { workspace = true } mysql_common = { version = "0.32", default-features = false, features = [ "chrono", ] } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 53621721ac25b..d8bd4a9dbcd56 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -44,7 +44,7 @@ use thiserror_ext::AsReport; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; -pub use self::mysql::mysql_row_to_owned_row; +pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row}; use self::plain_parser::PlainParser; pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index fe9b77c643de7..e9a8eeba70cb3 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -14,128 +14,201 @@ use std::sync::LazyLock; -use chrono::NaiveDate; use mysql_async::Row as MysqlRow; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{ - DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, -}; -use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; use crate::parser::util::log_error; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); +use anyhow::anyhow; +use chrono::NaiveDate; +use risingwave_common::bail; +use risingwave_common::types::{ + DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, +}; +use rust_decimal::Decimal as RustDecimal; macro_rules! handle_data_type { ($row:expr, $i:expr, $name:expr, $type:ty) => {{ - let res = $row.take_opt::, _>($i).unwrap_or(Ok(None)); - match res { - Ok(val) => val.map(|v| ScalarImpl::from(v)), - Err(err) => { - log_error!($name, err, "parse column failed"); - None - } + match $row.take_opt::, _>($i) { + None => bail!("no value found at column: {}, index: {}", $name, $i), + Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $name, + $i, + stringify!($type), + ))), } }}; ($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{ - let res = $row.take_opt::, _>($i).unwrap_or(Ok(None)); - match res { - Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), - Err(err) => { - log_error!($name, err, "parse column failed"); - None - } + match $row.take_opt::, _>($i) { + None => bail!("no value found at column: {}, index: {}", $name, $i), + Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $name, + $i, + stringify!($ty), + ))), } }}; } +/// The decoding result can be interpreted as follows: +/// Ok(value) => The value was found and successfully decoded. +/// Err(error) => The value was found but could not be decoded, +/// either because it was not supported, +/// or there was an error during conversion. +pub fn mysql_datum_to_rw_datum( + mysql_row: &mut MysqlRow, + mysql_datum_index: usize, + column_name: &str, + rw_data_type: &DataType, +) -> Result { + match rw_data_type { + DataType::Boolean => { + // Bit(1) + match mysql_row.take_opt::>, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => match val { + None => Ok(None), + Some(val) => match val.as_slice() { + [0] => Ok(Some(ScalarImpl::from(false))), + [1] => Ok(Some(ScalarImpl::from(true))), + _ => Err(anyhow!("invalid value for boolean: {:?}", val)), + }, + }, + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: Vec", + column_name, mysql_datum_index, + ))), + } + } + DataType::Int16 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) + } + DataType::Int32 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) + } + DataType::Int64 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) + } + DataType::Float32 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) + } + DataType::Float64 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) + } + DataType::Decimal => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + RustDecimal, + Decimal + ) + } + DataType::Varchar => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, String) + } + DataType::Date => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date) + } + DataType::Time => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + chrono::NaiveTime, + Time + ) + } + DataType::Timestamp => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + chrono::NaiveDateTime, + Timestamp + ) + } + DataType::Timestamptz => { + match mysql_row.take_opt::, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => Ok(val.map(|v| { + ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros())) + })), + Some(Err(err)) => Err(anyhow::Error::new(err.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: chrono::NaiveDateTime", + column_name, mysql_datum_index, + ))), + } + } + DataType::Bytea => match mysql_row.take_opt::>, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => Ok(val.map(ScalarImpl::from)), + Some(Err(err)) => Err(anyhow::Error::new(err.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: Vec", + column_name, mysql_datum_index, + ))), + }, + DataType::Jsonb => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + serde_json::Value, + JsonbVal + ) + } + DataType::Interval + | DataType::Struct(_) + | DataType::List(_) + | DataType::Int256 + | DataType::Serial + | DataType::Map(_) => Err(anyhow!( + "unsupported data type: {}, set to null", + rw_data_type + )), + } +} + pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow { let mut datums = vec![]; for i in 0..schema.fields.len() { let rw_field = &schema.fields[i]; let name = rw_field.name.as_str(); - let datum = { - match rw_field.data_type { - DataType::Boolean => { - handle_data_type!(mysql_row, i, name, bool) - } - DataType::Int16 => { - handle_data_type!(mysql_row, i, name, i16) - } - DataType::Int32 => { - handle_data_type!(mysql_row, i, name, i32) - } - DataType::Int64 => { - handle_data_type!(mysql_row, i, name, i64) - } - DataType::Float32 => { - handle_data_type!(mysql_row, i, name, f32) - } - DataType::Float64 => { - handle_data_type!(mysql_row, i, name, f64) - } - DataType::Decimal => { - handle_data_type!(mysql_row, i, name, RustDecimal, Decimal) - } - DataType::Varchar => { - handle_data_type!(mysql_row, i, name, String) - } - DataType::Date => { - handle_data_type!(mysql_row, i, name, NaiveDate, Date) - } - DataType::Time => { - handle_data_type!(mysql_row, i, name, chrono::NaiveTime, Time) - } - DataType::Timestamp => { - handle_data_type!(mysql_row, i, name, chrono::NaiveDateTime, Timestamp) - } - DataType::Timestamptz => { - let res = mysql_row - .take_opt::, _>(i) - .unwrap_or(Ok(None)); - match res { - Ok(val) => val.map(|v| { - ScalarImpl::from(Timestamptz::from_micros( - v.and_utc().timestamp_micros(), - )) - }), - Err(err) => { - log_error!(name, err, "parse column failed"); - None - } - } - } - DataType::Bytea => { - let res = mysql_row - .take_opt::>, _>(i) - .unwrap_or(Ok(None)); - match res { - Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), - Err(err) => { - log_error!(name, err, "parse column failed"); - None - } - } - } - DataType::Jsonb => { - handle_data_type!(mysql_row, i, name, serde_json::Value, JsonbVal) - } - DataType::Interval - | DataType::Struct(_) - | DataType::List(_) - | DataType::Int256 - | DataType::Serial - | DataType::Map(_) => { - // Interval, Struct, List, Int256 are not supported - // XXX: is this branch reachable? - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::warn!(column = rw_field.name, ?rw_field.data_type, suppressed_count, "unsupported data type, set to null"); - } - None - } + let datum = match mysql_datum_to_rw_datum(mysql_row, i, name, &rw_field.data_type) { + Ok(val) => val, + Err(e) => { + log_error!(name, e, "parse column failed"); + None } }; datums.push(datum); diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs index 08203c176fcbc..30a46259ffdb5 100644 --- a/src/error/src/anyhow.rs +++ b/src/error/src/anyhow.rs @@ -126,7 +126,7 @@ macro_rules! def_anyhow_newtype { ) => { #[derive(::thiserror::Error, ::std::fmt::Debug)] #[error(transparent)] - $(#[$attr])* $vis struct $name(#[from] #[backtrace] ::anyhow::Error); + $(#[$attr])* $vis struct $name(#[from] #[backtrace] pub ::anyhow::Error); impl $name { /// Unwrap the newtype to get the inner [`anyhow::Error`]. diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index bbb2951e162f1..76ee2aa076e8e 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -45,6 +45,9 @@ linkme = { version = "0.3", features = ["used_linker"] } maplit = "1" md5 = "0.7.0" memcomparable = "0.2" +mysql_async = { version = "0.34", default-features = false, features = [ + "default", +] } num-integer = "0.1" parking_lot = { workspace = true } parse-display = "0.10" diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index ddc21c6ee7ac7..f7a4007ffd467 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -333,6 +333,17 @@ impl Binder { .context("postgres_query error")? .into()); } + // `mysql_query` table function + if func_name.eq("mysql_query") { + reject_syntax!( + arg_list.variadic, + "`VARIADIC` is not allowed in table function call" + ); + self.ensure_table_function_allowed()?; + return Ok(TableFunction::new_mysql_query(args) + .context("mysql_query error")? + .into()); + } // UDTF if let Some(ref udf) = udf && udf.kind.is_table() diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 5f22398cc5834..cee4188e75791 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -14,7 +14,10 @@ use std::sync::{Arc, LazyLock}; +use anyhow::Context; use itertools::Itertools; +use mysql_async::consts::ColumnType as MySqlColumnType; +use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; @@ -22,8 +25,8 @@ pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; use thiserror_ext::AsReport; use tokio::runtime::Runtime; -use tokio_postgres; use tokio_postgres::types::Type as TokioPgType; +use {mysql_async, tokio_postgres}; use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult}; use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; @@ -298,7 +301,7 @@ impl TableFunction { tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!( - "postgres_query_executor: connection error: {:?}", + "mysql_query_executor: connection error: {:?}", e.as_report() ); } @@ -350,6 +353,162 @@ impl TableFunction { } } + pub fn new_mysql_query(args: Vec) -> RwResult { + static MYSQL_ARGS_LEN: usize = 6; + let args = { + if args.len() != MYSQL_ARGS_LEN { + return Err(BindError("mysql_query function only accepts 6 arguments: mysql_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, mysql_query varchar)".to_string()).into()); + } + let mut cast_args = Vec::with_capacity(MYSQL_ARGS_LEN); + for arg in args { + let arg = arg.cast_implicit(DataType::Varchar)?; + cast_args.push(arg); + } + cast_args + }; + let evaled_args = { + let mut evaled_args: Vec = Vec::with_capacity(MYSQL_ARGS_LEN); + for arg in &args { + match arg.try_fold_const() { + Some(Ok(value)) => { + let Some(scalar) = value else { + return Err(BindError( + "mysql_query function does not accept null arguments".to_string(), + ) + .into()); + }; + evaled_args.push(scalar.into_utf8().into()); + } + Some(Err(err)) => { + return Err(err); + } + None => { + return Err(BindError( + "mysql_query function only accepts constant arguments".to_string(), + ) + .into()); + } + } + } + evaled_args + }; + + #[cfg(madsim)] + { + return Err(crate::error::ErrorCode::BindError( + "postgres_query can't be used in the madsim mode".to_string(), + ) + .into()); + } + + #[cfg(not(madsim))] + { + let schema = tokio::task::block_in_place(|| { + RUNTIME.block_on(async { + let database_opts: mysql_async::Opts = { + let port = evaled_args[1] + .parse::() + .context("failed to parse port")?; + mysql_async::OptsBuilder::default() + .ip_or_hostname(evaled_args[0].clone()) + .tcp_port(port) + .user(Some(evaled_args[2].clone())) + .pass(Some(evaled_args[3].clone())) + .db_name(Some(evaled_args[4].clone())) + .into() + }; + + let pool = mysql_async::Pool::new(database_opts); + let mut conn = pool + .get_conn() + .await + .context("failed to connect to mysql in binder")?; + + let query = evaled_args[5].clone(); + let statement = conn + .prep(query) + .await + .context("failed to prepare mysql_query in binder")?; + + let mut rw_types = vec![]; + #[allow(clippy::never_loop)] + for column in statement.columns() { + let name = column.name_str().to_string(); + let data_type = match column.column_type() { + // Boolean types + MySqlColumnType::MYSQL_TYPE_BIT if column.column_length() == 1 => { + DataType::Boolean + } + + // Numeric types + // NOTE(kwannoel): Although `bool/boolean` is a synonym of TINY(1) in MySQL, + // we treat it as Int16 here. It is better to be straightforward in our conversion. + MySqlColumnType::MYSQL_TYPE_TINY => DataType::Int16, + MySqlColumnType::MYSQL_TYPE_SHORT => DataType::Int16, + MySqlColumnType::MYSQL_TYPE_INT24 => DataType::Int32, + MySqlColumnType::MYSQL_TYPE_LONG => DataType::Int32, + MySqlColumnType::MYSQL_TYPE_LONGLONG => DataType::Int64, + MySqlColumnType::MYSQL_TYPE_FLOAT => DataType::Float32, + MySqlColumnType::MYSQL_TYPE_DOUBLE => DataType::Float64, + MySqlColumnType::MYSQL_TYPE_NEWDECIMAL => DataType::Decimal, + MySqlColumnType::MYSQL_TYPE_DECIMAL => DataType::Decimal, + + // Date time types + MySqlColumnType::MYSQL_TYPE_YEAR => DataType::Int32, + MySqlColumnType::MYSQL_TYPE_DATE => DataType::Date, + MySqlColumnType::MYSQL_TYPE_NEWDATE => DataType::Date, + MySqlColumnType::MYSQL_TYPE_TIME => DataType::Time, + MySqlColumnType::MYSQL_TYPE_TIME2 => DataType::Time, + MySqlColumnType::MYSQL_TYPE_DATETIME => DataType::Timestamp, + MySqlColumnType::MYSQL_TYPE_DATETIME2 => DataType::Timestamp, + MySqlColumnType::MYSQL_TYPE_TIMESTAMP => DataType::Timestamptz, + MySqlColumnType::MYSQL_TYPE_TIMESTAMP2 => DataType::Timestamptz, + + // String types + MySqlColumnType::MYSQL_TYPE_VARCHAR + | MySqlColumnType::MYSQL_TYPE_STRING + | MySqlColumnType::MYSQL_TYPE_VAR_STRING => DataType::Varchar, + + // JSON types + MySqlColumnType::MYSQL_TYPE_JSON => DataType::Jsonb, + + // Binary types + MySqlColumnType::MYSQL_TYPE_BIT + | MySqlColumnType::MYSQL_TYPE_BLOB + | MySqlColumnType::MYSQL_TYPE_TINY_BLOB + | MySqlColumnType::MYSQL_TYPE_MEDIUM_BLOB + | MySqlColumnType::MYSQL_TYPE_LONG_BLOB => DataType::Bytea, + + MySqlColumnType::MYSQL_TYPE_UNKNOWN + | MySqlColumnType::MYSQL_TYPE_TYPED_ARRAY + | MySqlColumnType::MYSQL_TYPE_ENUM + | MySqlColumnType::MYSQL_TYPE_SET + | MySqlColumnType::MYSQL_TYPE_GEOMETRY + | MySqlColumnType::MYSQL_TYPE_NULL => { + return Err(crate::error::ErrorCode::BindError( + format!("unsupported column type: {:?}", column.column_type()) + .to_string(), + ) + .into()); + } + }; + rw_types.push((name, data_type)); + } + Ok::(DataType::Struct( + StructType::new(rw_types), + )) + }) + })?; + + Ok(TableFunction { + args, + return_type: schema, + function_type: TableFunctionType::MysqlQuery, + user_defined: None, + }) + } + } + pub fn to_protobuf(&self) -> PbTableFunction { PbTableFunction { function_type: self.function_type as i32, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 766cde4ecfc7e..e413188154569 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -136,6 +136,8 @@ static TABLE_FUNCTION_CONVERT: LazyLock = LazyLock::new(|| { TableFunctionToFileScanRule::create(), // Apply postgres query rule next TableFunctionToPostgresQueryRule::create(), + // Apply mysql query rule next + TableFunctionToMySqlQueryRule::create(), // Apply project set rule last TableFunctionToProjectSetRule::create(), ], @@ -159,6 +161,14 @@ static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock = LazyLock: ) }); +static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Table Function To MySQL", + vec![TableFunctionToMySqlQueryRule::create()], + ApplyOrder::TopDown, + ) +}); + static VALUES_EXTRACT_PROJECT: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Values Extract Project", @@ -713,6 +723,7 @@ impl LogicalOptimizer { // Table function should be converted into `file_scan` before `project_set`. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN); plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY); + plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT); diff --git a/src/frontend/src/optimizer/plan_node/batch_mysql_query.rs b/src/frontend/src/optimizer/plan_node/batch_mysql_query.rs new file mode 100644 index 0000000000000..308b1e82c63f3 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_mysql_query.rs @@ -0,0 +1,96 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::XmlNode; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::MySqlQueryNode; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{Distribution, Order}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchMySqlQuery { + pub base: PlanBase, + pub core: generic::MySqlQuery, +} + +impl BatchMySqlQuery { + pub fn new(core: generic::MySqlQuery) -> Self { + let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any()); + + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } + + pub fn clone_with_dist(&self) -> Self { + let base = self.base.clone_with_new_distribution(Distribution::Single); + Self { + base, + core: self.core.clone(), + } + } +} + +impl_plan_tree_node_for_leaf! { BatchMySqlQuery } + +impl Distill for BatchMySqlQuery { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![("columns", column_names_pretty(self.schema()))]; + childless_record("BatchMySqlQuery", fields) + } +} + +impl ToLocalBatch for BatchMySqlQuery { + fn to_local(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToDistributedBatch for BatchMySqlQuery { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToBatchPb for BatchMySqlQuery { + fn to_batch_prost_body(&self) -> NodeBody { + NodeBody::MysqlQuery(MySqlQueryNode { + columns: self + .core + .columns() + .iter() + .map(|c| c.to_protobuf()) + .collect(), + hostname: self.core.hostname.clone(), + port: self.core.port.clone(), + username: self.core.username.clone(), + password: self.core.password.clone(), + database: self.core.database.clone(), + query: self.core.query.clone(), + }) + } +} + +impl ExprRewritable for BatchMySqlQuery {} + +impl ExprVisitable for BatchMySqlQuery {} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 6a076025b906c..c35a367e8ccec 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -92,6 +92,9 @@ pub use file_scan::*; mod postgres_query; pub use postgres_query::*; +mod mysql_query; +pub use mysql_query::*; + pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; } diff --git a/src/frontend/src/optimizer/plan_node/generic/mysql_query.rs b/src/frontend/src/optimizer/plan_node/generic/mysql_query.rs new file mode 100644 index 0000000000000..03bbfa0b229eb --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/mysql_query.rs @@ -0,0 +1,67 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; + +use super::GenericPlanNode; +use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct MySqlQuery { + pub schema: Schema, + pub hostname: String, + pub port: String, + pub username: String, + pub password: String, + pub database: String, + pub query: String, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for MySqlQuery { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + +impl MySqlQuery { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_mysql_query.rs b/src/frontend/src/optimizer/plan_node/logical_mysql_query.rs new file mode 100644 index 0000000000000..1512fe60120a3 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_mysql_query.rs @@ -0,0 +1,115 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::XmlNode; +use risingwave_common::bail; +use risingwave_common::catalog::Schema; + +use super::generic::GenericPlanRef; +use super::utils::{childless_record, Distill}; +use super::{ + generic, BatchMySqlQuery, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, + PlanRef, PredicatePushdown, ToBatch, ToStream, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext, + ToStreamContext, +}; +use crate::utils::{ColIndexMapping, Condition}; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalMySqlQuery { + pub base: PlanBase, + pub core: generic::MySqlQuery, +} + +impl LogicalMySqlQuery { + pub fn new( + ctx: OptimizerContextRef, + schema: Schema, + hostname: String, + port: String, + username: String, + password: String, + database: String, + query: String, + ) -> Self { + let core = generic::MySqlQuery { + schema, + hostname, + port, + username, + password, + database, + query, + ctx, + }; + + let base = PlanBase::new_logical_with_core(&core); + + LogicalMySqlQuery { base, core } + } +} + +impl_plan_tree_node_for_leaf! {LogicalMySqlQuery} +impl Distill for LogicalMySqlQuery { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![("columns", column_names_pretty(self.schema()))]; + childless_record("LogicalMySqlQuery", fields) + } +} + +impl ColPrunable for LogicalMySqlQuery { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into() + } +} + +impl ExprRewritable for LogicalMySqlQuery {} + +impl ExprVisitable for LogicalMySqlQuery {} + +impl PredicatePushdown for LogicalMySqlQuery { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + // No pushdown. + LogicalFilter::create(self.clone().into(), predicate) + } +} + +impl ToBatch for LogicalMySqlQuery { + fn to_batch(&self) -> Result { + Ok(BatchMySqlQuery::new(self.core.clone()).into()) + } +} + +impl ToStream for LogicalMySqlQuery { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + bail!("mysql_query function is not supported in streaming mode") + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + bail!("mysql_query function is not supported in streaming mode") + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs b/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs index 9082bd86a3f37..d3d793fb8ba01 100644 --- a/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs +++ b/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs @@ -103,13 +103,13 @@ impl ToBatch for LogicalPostgresQuery { impl ToStream for LogicalPostgresQuery { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - bail!("file_scan function is not supported in streaming mode") + bail!("postgres_query function is not supported in streaming mode") } fn logical_rewrite_for_stream( &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - bail!("file_scan function is not supported in streaming mode") + bail!("postgres_query function is not supported in streaming mode") } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 9b814ab8289c2..432475b52809b 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -935,10 +935,14 @@ mod batch_file_scan; mod batch_iceberg_scan; mod batch_kafka_scan; mod batch_postgres_query; + +mod batch_mysql_query; mod derive; mod logical_file_scan; mod logical_iceberg_scan; mod logical_postgres_query; + +mod logical_mysql_query; mod stream_cdc_table_scan; mod stream_share; mod stream_temporal_join; @@ -961,6 +965,7 @@ pub use batch_limit::BatchLimit; pub use batch_log_seq_scan::BatchLogSeqScan; pub use batch_lookup_join::BatchLookupJoin; pub use batch_max_one_row::BatchMaxOneRow; +pub use batch_mysql_query::BatchMySqlQuery; pub use batch_nested_loop_join::BatchNestedLoopJoin; pub use batch_over_window::BatchOverWindow; pub use batch_postgres_query::BatchPostgresQuery; @@ -997,6 +1002,7 @@ pub use logical_kafka_scan::LogicalKafkaScan; pub use logical_limit::LogicalLimit; pub use logical_max_one_row::LogicalMaxOneRow; pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder}; +pub use logical_mysql_query::LogicalMySqlQuery; pub use logical_now::LogicalNow; pub use logical_over_window::LogicalOverWindow; pub use logical_postgres_query::LogicalPostgresQuery; @@ -1112,6 +1118,7 @@ macro_rules! for_all_plan_nodes { , { Logical, ChangeLog } , { Logical, FileScan } , { Logical, PostgresQuery } + , { Logical, MySqlQuery } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1144,6 +1151,7 @@ macro_rules! for_all_plan_nodes { , { Batch, IcebergScan } , { Batch, FileScan } , { Batch, PostgresQuery } + , { Batch, MySqlQuery } , { Stream, Project } , { Stream, Filter } , { Stream, TableScan } @@ -1226,6 +1234,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, ChangeLog } , { Logical, FileScan } , { Logical, PostgresQuery } + , { Logical, MySqlQuery } } }; } @@ -1267,6 +1276,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, IcebergScan } , { Batch, FileScan } , { Batch, PostgresQuery } + , { Batch, MySqlQuery } } }; } diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 56d79bf7b408b..7468f1c96524c 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -161,6 +161,7 @@ mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; +mod table_function_to_mysql_query_rule; mod table_function_to_postgres_query_rule; mod values_extract_project_rule; @@ -169,6 +170,7 @@ pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; pub use source_to_kafka_scan_rule::*; pub use table_function_to_file_scan_rule::*; +pub use table_function_to_mysql_query_rule::*; pub use table_function_to_postgres_query_rule::*; pub use values_extract_project_rule::*; @@ -234,6 +236,7 @@ macro_rules! for_all_rules { , { TableFunctionToProjectSetRule } , { TableFunctionToFileScanRule } , { TableFunctionToPostgresQueryRule } + , { TableFunctionToMySqlQueryRule } , { ApplyLimitTransposeRule } , { CommonSubExprExtractRule } , { BatchProjectMergeRule } diff --git a/src/frontend/src/optimizer/rule/table_function_to_mysql_query_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_mysql_query_rule.rs new file mode 100644 index 0000000000000..0ad534825790c --- /dev/null +++ b/src/frontend/src/optimizer/rule/table_function_to_mysql_query_rule.rs @@ -0,0 +1,91 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::util::iter_util::ZipEqDebug; + +use super::{BoxedRule, Rule}; +use crate::expr::{Expr, TableFunctionType}; +use crate::optimizer::plan_node::generic::GenericPlanRef; +// use crate::optimizer::plan_node::{LogicalMySqlQuery, LogicalTableFunction}; +use crate::optimizer::plan_node::{LogicalMySqlQuery, LogicalTableFunction}; +use crate::optimizer::PlanRef; + +/// Transform a special `TableFunction` (with `MYSQL_QUERY` table function type) into a `LogicalMySqlQuery` +pub struct TableFunctionToMySqlQueryRule {} +impl Rule for TableFunctionToMySqlQueryRule { + fn apply(&self, plan: PlanRef) -> Option { + let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?; + if logical_table_function.table_function.function_type != TableFunctionType::MysqlQuery { + return None; + } + assert!(!logical_table_function.with_ordinality); + let table_function_return_type = logical_table_function.table_function().return_type(); + + if let DataType::Struct(st) = table_function_return_type.clone() { + let fields = st + .types() + .zip_eq_debug(st.names()) + .map(|(data_type, name)| Field::with_name(data_type.clone(), name.to_string())) + .collect_vec(); + + let schema = Schema::new(fields); + + assert_eq!(logical_table_function.table_function().args.len(), 6); + let mut eval_args = vec![]; + for arg in &logical_table_function.table_function().args { + assert_eq!(arg.return_type(), DataType::Varchar); + let value = arg.try_fold_const().unwrap().unwrap(); + match value { + Some(ScalarImpl::Utf8(s)) => { + eval_args.push(s.to_string()); + } + _ => { + unreachable!("must be a varchar") + } + } + } + let hostname = eval_args[0].clone(); + let port = eval_args[1].clone(); + let username = eval_args[2].clone(); + let password = eval_args[3].clone(); + let database = eval_args[4].clone(); + let query = eval_args[5].clone(); + + Some( + LogicalMySqlQuery::new( + logical_table_function.ctx(), + schema, + hostname, + port, + username, + password, + database, + query, + ) + .into(), + ) + } else { + unreachable!("TableFunction return type should be struct") + } + } +} + +impl TableFunctionToMySqlQueryRule { + pub fn create() -> BoxedRule { + Box::new(TableFunctionToMySqlQueryRule {}) + } +}