diff --git a/e2e_test/slow_tests/udf/always_retry_python.slt b/e2e_test/slow_tests/udf/always_retry_python.slt index 18184846f272a..c493cd81bbe77 100644 --- a/e2e_test/slow_tests/udf/always_retry_python.slt +++ b/e2e_test/slow_tests/udf/always_retry_python.slt @@ -67,10 +67,10 @@ system ok pkill -i python statement ok -DROP FUNCTION sleep_always_retry; +DROP TABLE t CASCADE; statement ok -DROP FUNCTION sleep_no_retry; +DROP FUNCTION sleep_always_retry; statement ok -DROP TABLE t CASCADE; \ No newline at end of file +DROP FUNCTION sleep_no_retry; diff --git a/e2e_test/udf/drop_function.slt b/e2e_test/udf/drop_function.slt new file mode 100644 index 0000000000000..b7d36bd97627a --- /dev/null +++ b/e2e_test/udf/drop_function.slt @@ -0,0 +1,44 @@ +# https://github.com/risingwavelabs/risingwave/issues/17263 + +statement ok +create table t (a int, b int); + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement ok +create materialized view mv as select add(a, b) as c from t; + +statement error function used by 1 other objects +drop function add; + +statement ok +drop materialized view mv; + +statement ok +drop function add; + + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement ok +create sink s as select add(a, b) as c from t with (connector = 'blackhole'); + +statement error function used by 1 other objects +drop function add; + +statement ok +drop sink s; + +statement ok +drop function add; + +statement ok +drop table t; diff --git a/proto/catalog.proto b/proto/catalog.proto index 9bb8e62fd4a11..663f146ff415e 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -170,7 +170,7 @@ message Sink { repeated plan_common.ColumnCatalog columns = 5; // Primary key derived from the SQL by the frontend. repeated common.ColumnOrder plan_pk = 6; - repeated uint32 dependent_relations = 7; + repeated uint32 dependent_relations = 7 [deprecated = true]; repeated int32 distribution_key = 8; // User-defined primary key indices for the upsert sink. repeated int32 downstream_pk = 9; @@ -342,7 +342,7 @@ message Table { repeated plan_common.ColumnCatalog columns = 5; repeated common.ColumnOrder pk = 6; // For cdc table created from a cdc source, here records the source id. - repeated uint32 dependent_relations = 8; + repeated uint32 dependent_relations = 8; // TODO(rc): deprecate this by passing dependencies via `Request` message oneof optional_associated_source_id { uint32 associated_source_id = 9; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6467bd6e1d7e7..884296f76fed9 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -86,6 +86,8 @@ message CreateSinkRequest { stream_plan.StreamFragmentGraph fragment_graph = 2; // It is used to provide a replace plan for the downstream table in `create sink into table` requests. optional ReplaceTablePlan affected_table_change = 3; + // The list of object IDs that this sink depends on. + repeated uint32 dependencies = 4; } message CreateSinkResponse { @@ -136,6 +138,9 @@ message CreateMaterializedViewRequest { SERVERLESS = 2; } BackfillType backfill = 3; + + // The list of object IDs that this materialized view depends on. + repeated uint32 dependencies = 4; } message CreateMaterializedViewResponse { diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index e955525ba030e..1fbabdfe57771 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -168,6 +168,8 @@ pub trait SysCatalogReader: Sync + Send + 'static { pub type SysCatalogReaderRef = Arc; +pub type ObjectId = u32; + #[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)] #[display("{database_id}")] pub struct DatabaseId { diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 5f097eafa33c9..3c7df355da0ae 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -91,7 +91,6 @@ impl SinkDesc { database_id: DatabaseId, owner: UserId, connection_id: Option, - dependent_relations: Vec, ) -> SinkCatalog { SinkCatalog { id: self.id, @@ -104,7 +103,6 @@ impl SinkDesc { downstream_pk: self.downstream_pk, distribution_key: self.distribution_key, owner, - dependent_relations, properties: self.properties, secret_refs: self.secret_refs, sink_type: self.sink_type, diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index a2b8caa311253..5c9937196712e 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -343,9 +343,6 @@ pub struct SinkCatalog { /// Owner of the sink. pub owner: UserId, - // Relations on which the sink depends. - pub dependent_relations: Vec, - // The append-only behavior of the physical sink connector. Frontend will determine `sink_type` // based on both its own derivation on the append-only attribute and other user-specified // options in `properties`. @@ -382,6 +379,7 @@ pub struct SinkCatalog { impl SinkCatalog { pub fn to_proto(&self) -> PbSink { + #[allow(deprecated)] // for `dependent_relations` PbSink { id: self.id.into(), schema_id: self.schema_id.schema_id, @@ -395,11 +393,7 @@ impl SinkCatalog { .iter() .map(|idx| *idx as i32) .collect_vec(), - dependent_relations: self - .dependent_relations - .iter() - .map(|id| id.table_id) - .collect_vec(), + dependent_relations: vec![], distribution_key: self .distribution_key .iter() @@ -507,11 +501,6 @@ impl From for SinkCatalog { .collect_vec(), properties: pb.properties, owner: pb.owner.into(), - dependent_relations: pb - .dependent_relations - .into_iter() - .map(TableId::from) - .collect_vec(), sink_type: SinkType::from_proto(sink_type), format_desc, connection_id: pb.connection_id.map(ConnectionId), diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index f7a4007ffd467..61289a59ac993 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; @@ -154,6 +154,8 @@ impl Binder { .flatten_ok() .try_collect()?; + let mut referred_udfs = HashSet::new(); + let wrapped_agg_type = if scalar_as_agg { // Let's firstly try to apply the `AGGREGATE:` prefix. // We will reject functions that are not able to be wrapped as aggregate function. @@ -167,12 +169,16 @@ impl Binder { let scalar_func_expr = if let Ok(schema) = self.first_valid_schema() && let Some(func) = schema.get_function_by_name_inputs(&func_name, &mut array_args) { + // record the dependency upon the UDF + referred_udfs.insert(func.id); + if !func.kind.is_scalar() { return Err(ErrorCode::InvalidInputSyntax( "expect a scalar function after `AGGREGATE:`".to_string(), ) .into()); } + if func.language == "sql" { self.bind_sql_udf(func.clone(), array_args)? } else { @@ -194,6 +200,9 @@ impl Binder { .get_function_by_name_inputs(&func_name, &mut args) .cloned() { + // record the dependency upon the UDF + referred_udfs.insert(func.id); + if func.language == "sql" { let name = format!("SQL user-defined function `{}`", func.name); reject_syntax!( @@ -228,6 +237,8 @@ impl Binder { None }; + self.included_udfs.extend(referred_udfs); + let agg_type = if wrapped_agg_type.is_some() { wrapped_agg_type } else if let Some(ref udf) = udf diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 4560e51bd6562..f0d42fd321f3f 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_common::catalog::FunctionId; use risingwave_common::session_config::{SearchPath, SessionConfig}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; @@ -121,6 +122,9 @@ pub struct Binder { /// The included relations while binding a query. included_relations: HashSet, + /// The included user-defined functions while binding a query. + included_udfs: HashSet, + param_types: ParameterTypes, /// The sql udf context that will be used during binding phase @@ -324,6 +328,7 @@ impl Binder { bind_for, shared_views: HashMap::new(), included_relations: HashSet::new(), + included_udfs: HashSet::new(), param_types: ParameterTypes::new(param_types), udf_context: UdfContext::new(), temporary_source_manager: session.temporary_source_manager(), @@ -382,13 +387,18 @@ impl Binder { self.param_types.export() } - /// Returns included relations in the query after binding. This is used for resolving relation + /// Get included relations in the query after binding. This is used for resolving relation /// dependencies. Note that it only contains referenced relations discovered during binding. /// After the plan is built, the referenced relations may be changed. We cannot rely on the /// collection result of plan, because we still need to record the dependencies that have been /// optimised away. - pub fn included_relations(&self) -> HashSet { - self.included_relations.clone() + pub fn included_relations(&self) -> &HashSet { + &self.included_relations + } + + /// Get included user-defined functions in the query after binding. + pub fn included_udfs(&self) -> &HashSet { + &self.included_udfs } fn push_context(&mut self) { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 271d395181df8..cc7cac86570c5 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use anyhow::anyhow; use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::catalog::{ @@ -78,6 +79,7 @@ pub trait CatalogWriter: Send + Sync { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result<()>; async fn create_table( @@ -115,6 +117,7 @@ pub trait CatalogWriter: Send + Sync { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result<()>; async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>; @@ -246,11 +249,12 @@ impl CatalogWriter for CatalogWriterImpl { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result<()> { let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground); let version = self .meta_client - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, dependencies) .await?; if matches!(create_type, PbCreateType::Foreground) { self.wait_version(version).await? @@ -316,10 +320,11 @@ impl CatalogWriter for CatalogWriterImpl { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result<()> { let version = self .meta_client - .create_sink(sink, graph, affected_table_change) + .create_sink(sink, graph, affected_table_change, dependencies) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 2c913a6834d37..66105099432fc 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -15,10 +15,9 @@ use std::collections::HashSet; use either::Either; -use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::acl::AclMode; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{FunctionId, ObjectId, TableId}; use risingwave_pb::catalog::PbTable; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; @@ -93,15 +92,7 @@ pub fn gen_create_mv_plan( ) -> Result<(PlanRef, PbTable)> { let mut binder = Binder::new_for_stream(session); let bound = binder.bind_query(query)?; - gen_create_mv_plan_bound( - session, - context, - bound, - binder.included_relations(), - name, - columns, - emit_mode, - ) + gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode) } /// Generate create MV plan from a bound query @@ -109,7 +100,6 @@ pub fn gen_create_mv_plan_bound( session: &SessionImpl, context: OptimizerContextRef, query: BoundQuery, - dependent_relations: HashSet, name: ObjectName, columns: Vec, emit_mode: Option, @@ -147,17 +137,9 @@ pub fn gen_create_mv_plan_bound( let mut table = materialize.table().to_prost(schema_id, database_id); let plan: PlanRef = materialize.into(); - let dependent_relations = - RelationCollectorVisitor::collect_with(dependent_relations, plan.clone()); table.owner = session.user_id(); - // record dependent relations. - table.dependent_relations = dependent_relations - .into_iter() - .map(|t| t.table_id) - .collect_vec(); - let ctx = plan.ctx(); let explain_trace = ctx.is_explain_trace(); if explain_trace { @@ -176,10 +158,14 @@ pub async fn handle_create_mv( columns: Vec, emit_mode: Option, ) -> Result { - let (dependent_relations, bound) = { + let (dependent_relations, dependent_udfs, bound) = { let mut binder = Binder::new_for_stream(handler_args.session.as_ref()); let bound = binder.bind_query(query)?; - (binder.included_relations(), bound) + ( + binder.included_relations().clone(), + binder.included_udfs().clone(), + bound, + ) }; handle_create_mv_bound( handler_args, @@ -187,6 +173,7 @@ pub async fn handle_create_mv( name, bound, dependent_relations, + dependent_udfs, columns, emit_mode, ) @@ -199,6 +186,7 @@ pub async fn handle_create_mv_bound( name: ObjectName, query: BoundQuery, dependent_relations: HashSet, + dependent_udfs: HashSet, // TODO(rc): merge with `dependent_relations` columns: Vec, emit_mode: Option, ) -> Result { @@ -215,7 +203,7 @@ pub async fn handle_create_mv_bound( return Ok(resp); } - let (table, graph) = { + let (table, graph, dependencies) = { let context = OptimizerContext::from_handler_args(handler_args); if !context.with_options().is_empty() { // get other useful fields by `remove`, the logic here is to reject unknown options. @@ -232,19 +220,25 @@ It only indicates the physical clustering of the data, which may improve the per "#.to_string()); } - let (plan, table) = gen_create_mv_plan_bound( - &session, - context.into(), - query, - dependent_relations, - name, - columns, - emit_mode, - )?; + let (plan, table) = + gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?; + + // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies + // during binding instead of visiting the optimized plan. + let dependencies = + RelationCollectorVisitor::collect_with(dependent_relations, plan.clone()) + .into_iter() + .map(|id| id.table_id() as ObjectId) + .chain( + dependent_udfs + .into_iter() + .map(|id| id.function_id() as ObjectId), + ) + .collect(); let graph = build_graph(plan)?; - (table, graph) + (table, graph, dependencies) }; // Ensure writes to `StreamJobTracker` are atomic. @@ -262,7 +256,7 @@ It only indicates the physical clustering of the data, which may improve the per let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, dependencies) .await?; Ok(PgResponse::empty_result( diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index e280f90909267..7172e2ba7220b 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -23,7 +23,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bail; -use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, UserId}; +use risingwave_common::catalog::{ColumnCatalog, DatabaseId, ObjectId, Schema, SchemaId, UserId}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; @@ -72,6 +72,7 @@ pub struct SinkPlanContext { pub sink_plan: PlanRef, pub sink_catalog: SinkCatalog, pub target_table_catalog: Option>, + pub dependencies: HashSet, } pub async fn gen_sink_plan( @@ -122,10 +123,14 @@ pub async fn gen_sink_plan( let (sink_database_id, sink_schema_id) = session.get_database_and_schema_id_for_create(sink_schema_name.clone())?; - let (dependent_relations, bound) = { + let (dependent_relations, dependent_udfs, bound) = { let mut binder = Binder::new_for_stream(session); let bound = binder.bind_query(*query.clone())?; - (binder.included_relations(), bound) + ( + binder.included_relations().clone(), + binder.included_udfs().clone(), + bound, + ) }; let check_items = resolve_query_privileges(&bound); @@ -250,15 +255,24 @@ pub async fn gen_sink_plan( ctx.trace(sink_plan.explain_to_string()); } - let dependent_relations = - RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone()); + // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies + // during binding instead of visiting the optimized plan. + let dependencies = + RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone()) + .into_iter() + .map(|id| id.table_id() as ObjectId) + .chain( + dependent_udfs + .into_iter() + .map(|id| id.function_id() as ObjectId), + ) + .collect(); let sink_catalog = sink_desc.into_catalog( SchemaId::new(sink_schema_id), DatabaseId::new(sink_database_id), UserId::new(session.user_id()), None, // deprecated: private link connection id - dependent_relations.into_iter().collect_vec(), ); if let Some(table_catalog) = &target_table_catalog { @@ -311,6 +325,7 @@ pub async fn gen_sink_plan( sink_plan, sink_catalog, target_table_catalog, + dependencies, }) } @@ -425,12 +440,13 @@ pub async fn handle_create_sink( return Ok(resp); } - let (mut sink, graph, target_table_catalog) = { + let (mut sink, graph, target_table_catalog, dependencies) = { let SinkPlanContext { query, sink_plan: plan, sink_catalog: sink, target_table_catalog, + dependencies, } = gen_sink_plan(handle_args, stmt, None).await?; let has_order_by = !query.order_by.is_empty(); @@ -443,7 +459,7 @@ pub async fn handle_create_sink( let graph = build_graph(plan)?; - (sink, graph, target_table_catalog) + (sink, graph, target_table_catalog, dependencies) }; let mut target_table_replace_plan = None; @@ -501,7 +517,12 @@ pub async fn handle_create_sink( let catalog_writer = session.catalog_writer()?; catalog_writer - .create_sink(sink.to_proto(), graph, target_table_replace_plan) + .create_sink( + sink.to_proto(), + graph, + target_table_replace_plan, + dependencies, + ) .await?; Ok(PgResponse::empty_result(StatementType::CREATE_SINK)) diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index 720e317bcdc96..45978ec93d306 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -173,6 +173,7 @@ pub fn handle_bind( bound, param_types, dependent_relations, + dependent_udfs, .. } = bound_result; @@ -183,6 +184,7 @@ pub fn handle_bind( param_types, parsed_params: Some(parsed_params), dependent_relations, + dependent_udfs, bound: new_bound, }; Ok(Portal::Portal(PortalResult { diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 8759e7c5f5917..7f0b88826fabe 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -123,7 +123,8 @@ pub async fn handle_parse( bound: BoundStatement::FetchCursor(Box::new(bound)), param_types: binder.export_param_types()?, parsed_params: None, - dependent_relations: binder.included_relations(), + dependent_relations: binder.included_relations().clone(), + dependent_udfs: binder.included_udfs().clone(), }; let result = PreparedResult { statement, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 66a0dbcf7ca4f..9388f7762b08a 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -22,7 +22,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Format; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{FunctionId, Schema}; use risingwave_common::session_config::QueryMode; use risingwave_common::types::{DataType, Datum}; use risingwave_sqlparser::ast::{SetExpr, Statement}; @@ -109,6 +109,7 @@ pub async fn handle_execute( let BoundResult { bound, dependent_relations, + dependent_udfs, .. } = bound_result; let create_mv = if let BoundStatement::CreateView(create_mv) = bound { @@ -144,6 +145,7 @@ pub async fn handle_execute( name, *query, dependent_relations, + dependent_udfs, columns, emit_mode, ) @@ -184,6 +186,8 @@ pub struct BoundResult { pub(crate) param_types: Vec, pub(crate) parsed_params: Option>, pub(crate) dependent_relations: HashSet, + /// TODO(rc): merge with `dependent_relations` + pub(crate) dependent_udfs: HashSet, } fn gen_bound( @@ -207,7 +211,8 @@ fn gen_bound( bound, param_types: binder.export_param_types()?, parsed_params: None, - dependent_relations: binder.included_relations(), + dependent_relations: binder.included_relations().clone(), + dependent_udfs: binder.included_udfs().clone(), }) } diff --git a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs index 59535ddd2b654..c26e49fada502 100644 --- a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs @@ -21,6 +21,7 @@ use crate::optimizer::plan_node::{BatchSource, LogicalScan, StreamSource, Stream use crate::optimizer::plan_visitor::PlanVisitor; use crate::PlanRef; +/// TODO(rc): maybe we should rename this to `DependencyCollectorVisitor`. #[derive(Debug, Clone, Default)] pub struct RelationCollectorVisitor { relations: HashSet, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 15a5281dec5e2..810724d2bac22 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -25,8 +25,9 @@ use pgwire::pg_response::StatementType; use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator}; use pgwire::types::Row; use risingwave_common::catalog::{ - FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, - DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, + FunctionId, IndexId, ObjectId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, + RW_CATALOG_SCHEMA_NAME, }; use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat}; use risingwave_common::session_config::SessionConfig; @@ -280,6 +281,7 @@ impl CatalogWriter for MockCatalogWriter { &self, mut table: PbTable, _graph: StreamFragmentGraph, + _dependencies: HashSet, ) -> Result<()> { table.id = self.gen_id(); table.stream_job_status = PbStreamJobStatus::Created as _; @@ -310,7 +312,8 @@ impl CatalogWriter for MockCatalogWriter { table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } - self.create_materialized_view(table, graph).await?; + self.create_materialized_view(table, graph, HashSet::new()) + .await?; Ok(()) } @@ -341,6 +344,7 @@ impl CatalogWriter for MockCatalogWriter { sink: PbSink, graph: StreamFragmentGraph, _affected_table_change: Option, + _dependencies: HashSet, ) -> Result<()> { self.create_sink_inner(sink, graph) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index e59c4a4100141..ad6f5ba38942e 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -24,6 +24,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::metrics::MetaMetrics; +use risingwave_meta_model::ObjectId; use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; @@ -241,6 +242,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, CreateType::Foreground, None, + HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSource` )) .await?; Ok(Response::new(CreateSourceResponse { @@ -280,6 +282,11 @@ impl DdlService for DdlServiceImpl { let sink = req.get_sink()?.clone(); let fragment_graph = req.get_fragment_graph()?.clone(); let affected_table_change = req.get_affected_table_change().cloned().ok(); + let dependencies = req + .get_dependencies() + .iter() + .map(|id| *id as ObjectId) + .collect(); let stream_job = match &affected_table_change { None => StreamingJob::Sink(sink, None), @@ -295,6 +302,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, CreateType::Foreground, affected_table_change.map(Self::extract_replace_table_info), + dependencies, ); let version = self.ddl_controller.run_command(command).await?; @@ -380,6 +388,11 @@ impl DdlService for DdlServiceImpl { let mview = req.get_materialized_view()?.clone(); let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground); let fragment_graph = req.get_fragment_graph()?.clone(); + let dependencies = req + .get_dependencies() + .iter() + .map(|id| *id as ObjectId) + .collect(); let stream_job = StreamingJob::MaterializedView(mview); let version = self @@ -389,6 +402,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, create_type, None, + dependencies, )) .await?; @@ -442,6 +456,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, CreateType::Foreground, None, + HashSet::new(), )) .await?; @@ -528,6 +543,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, CreateType::Foreground, None, + HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbTable` )) .await?; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 0c64461ab2eec..876862acb34b9 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -216,6 +216,7 @@ impl From> for PbSink { if let Some(secret_ref) = value.0.secret_ref { secret_ref_map = secret_ref.to_protobuf(); } + #[allow(deprecated)] // for `dependent_relations` Self { id: value.0.sink_id as _, schema_id: value.1.schema_id.unwrap() as _, @@ -223,7 +224,7 @@ impl From> for PbSink { name: value.0.name, columns: value.0.columns.to_protobuf(), plan_pk: value.0.plan_pk.to_protobuf(), - dependent_relations: vec![], // todo: deprecate it. + dependent_relations: vec![], distribution_key: value.0.distribution_key.0, downstream_pk: value.0.downstream_pk.0, sink_type: PbSinkType::from(value.0.sink_type) as _, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index d5ee31efae246..23eec2f8075d3 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -113,6 +113,7 @@ impl CatalogController { ctx: &StreamContext, parallelism: &Option, max_parallelism: usize, + mut dependencies: HashSet, ) -> MetaResult<()> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -134,9 +135,16 @@ impl CatalogController { ) .await?; - // check if any dependent relation is in altering status. - let dependent_relations = streaming_job.dependent_relations(); - if !dependent_relations.is_empty() { + // TODO(rc): pass all dependencies uniformly, deprecate `dependent_relations` and `dependent_secret_ids`. + dependencies.extend( + streaming_job + .dependent_relations() + .into_iter() + .map(|id| id as ObjectId), + ); + + // check if any dependency is in altering status. + if !dependencies.is_empty() { let altering_cnt = ObjectDependency::find() .join( JoinType::InnerJoin, @@ -145,7 +153,7 @@ impl CatalogController { .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) .filter( object_dependency::Column::Oid - .is_in(dependent_relations.iter().map(|id| *id as ObjectId)) + .is_in(dependencies.clone()) .and(object::Column::ObjType.eq(ObjectType::Table)) .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)) .and( @@ -195,10 +203,7 @@ impl CatalogController { if let Some(target_table_id) = sink.target_table { if check_sink_into_table_cycle( target_table_id as ObjectId, - sink.dependent_relations - .iter() - .map(|id| *id as ObjectId) - .collect(), + dependencies.iter().cloned().collect(), &txn, ) .await? @@ -309,17 +314,19 @@ impl CatalogController { } } - // get dependent secrets. - let dependent_secret_ids = streaming_job.dependent_secret_ids()?; + // collect dependent secrets. + dependencies.extend( + streaming_job + .dependent_secret_ids()? + .into_iter() + .map(|secret_id| secret_id as ObjectId), + ); - let dependent_objs = dependent_relations - .iter() - .chain(dependent_secret_ids.iter()); // record object dependency. - if !dependent_secret_ids.is_empty() || !dependent_relations.is_empty() { - ObjectDependency::insert_many(dependent_objs.map(|id| { + if !dependencies.is_empty() { + ObjectDependency::insert_many(dependencies.into_iter().map(|oid| { object_dependency::ActiveModel { - oid: Set(*id as _), + oid: Set(oid), used_by: Set(streaming_job.id() as _), ..Default::default() } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 9d5b135095fb1..76d7052589cd9 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -299,13 +299,14 @@ impl StreamingJob { } } - // TODO: record all objects instead. + // TODO: to be removed, pass all objects uniformly through `dependencies` field instead. pub fn dependent_relations(&self) -> Vec { match self { StreamingJob::MaterializedView(table) => table.dependent_relations.clone(), - StreamingJob::Sink(sink, _) => sink.dependent_relations.clone(), - StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), + StreamingJob::Sink(_sink, _) => vec![], /* sink dependencies are now passed via `dependencies` field in `CreateSinkRequest` */ + StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), /* TODO(rc): record table dependencies via `dependencies` field */ StreamingJob::Index(index, index_table) => { + // TODO(rc): record index dependencies via `dependencies` field assert_eq!(index.primary_table_id, index_table.dependent_relations[0]); vec![] } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 5b6b7033719c4..3fd8112f43065 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; @@ -137,6 +137,7 @@ pub enum DdlCommand { StreamFragmentGraphProto, CreateType, Option, + HashSet, ), DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), @@ -177,7 +178,7 @@ impl DdlCommand { | DdlCommand::CommentOn(_) | DdlCommand::CreateSecret(_) | DdlCommand::AlterSwapRename(_) => true, - DdlCommand::CreateStreamingJob(_, _, _, _) + DdlCommand::CreateStreamingJob(_, _, _, _, _) | DdlCommand::CreateSourceWithoutStreamingJob(_) | DdlCommand::ReplaceTable(_) | DdlCommand::AlterSourceColumn(_) @@ -310,11 +311,13 @@ impl DdlController { fragment_graph, _create_type, affected_table_replace_info, + dependencies, ) => { ctrl.create_streaming_job( stream_job, fragment_graph, affected_table_replace_info, + dependencies, ) .await } @@ -914,6 +917,7 @@ impl DdlController { mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, affected_table_replace_info: Option, + dependencies: HashSet, ) -> MetaResult { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); self.metadata_manager @@ -923,6 +927,7 @@ impl DdlController { &ctx, &fragment_graph.parallelism, fragment_graph.max_parallelism as _, + dependencies, ) .await?; let job_id = streaming_job.id(); @@ -941,7 +946,6 @@ impl DdlController { .unwrap(); let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; - let id = streaming_job.id(); let name = streaming_job.name(); let definition = streaming_job.definition(); let source_id = match &streaming_job { @@ -963,7 +967,7 @@ impl DdlController { Err(err) => { tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job"); let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { - id, + id: job_id, name, definition, error: err.as_report().to_string(), diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 80213d0deda6c..d027608e34600 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -27,7 +27,7 @@ use either::Either; use futures::stream::BoxStream; use list_rate_limits_response::RateLimitInfo; use lru::LruCache; -use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; +use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::monitor::EndpointExt; @@ -392,11 +392,13 @@ impl MetaClient { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), backfill: PbBackfillType::Regular as _, + dependencies: dependencies.into_iter().collect(), }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here @@ -442,11 +444,13 @@ impl MetaClient { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), affected_table_change, + dependencies: dependencies.into_iter().collect(), }; let resp = self.inner.create_sink(request).await?;