Skip to content

Commit

Permalink
refactor: further clarify types of tables (#19539)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Nov 29, 2024
1 parent 3b3a1c5 commit d6cf971
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 88 deletions.
8 changes: 0 additions & 8 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@

use std::any::type_name;
use std::fmt::Debug;
use std::sync::LazyLock;

use anyhow::anyhow;
use itertools::Itertools;
use regex::Regex;

pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";

Expand All @@ -37,12 +35,6 @@ pub fn generate_internal_table_name_with_type(
)
}

pub fn valid_table_name(table_name: &str) -> bool {
static INTERNAL_TABLE_NAME: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"__internal_.*_\d+").unwrap());
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ pub enum CatalogError {
NotFound(&'static str, String),
#[error("{0} with name {1} exists")]
Duplicated(&'static str, String),
#[error("cannot drop {0} {1} because {2} {3} depend on it")]
NotEmpty(&'static str, String, &'static str, String),
}

impl From<CatalogError> for RwError {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ impl Catalog {
for database in self.database_by_name.values() {
if !found {
for schema in database.iter_schemas() {
if schema.iter_table().any(|t| t.id() == *table_id) {
if schema.iter_user_table().any(|t| t.id() == *table_id) {
found = true;
database_id = database.id();
schema_id = schema.id();
Expand Down
33 changes: 14 additions & 19 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::{valid_table_name, FunctionId, IndexId, StreamJobStatus, TableId};
use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
pub use risingwave_expr::sig::*;
Expand Down Expand Up @@ -46,7 +46,9 @@ pub struct SchemaCatalog {
id: SchemaId,
pub name: String,
pub database_id: DatabaseId,
/// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
table_by_name: HashMap<String, Arc<TableCatalog>>,
/// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
table_by_id: HashMap<TableId, Arc<TableCatalog>>,
source_by_name: HashMap<String, Arc<SourceCatalog>>,
source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
Expand Down Expand Up @@ -564,40 +566,33 @@ impl SchemaCatalog {
self.table_by_name.values()
}

pub fn iter_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
.filter(|(_, v)| v.is_table())
.map(|(_, v)| v)
pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name.values().filter(|v| v.is_user_table())
}

pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
.filter(|(_, v)| v.is_internal_table())
.map(|(_, v)| v)
.values()
.filter(|v| v.is_internal_table())
}

pub fn iter_valid_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
/// Iterate all non-internal tables, including user tables, materialized views and indices.
pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
.filter_map(|(key, v)| valid_table_name(key).then_some(v))
.values()
.filter(|v| !v.is_internal_table())
}

/// Iterate all materialized views, excluding the indices.
pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
.filter(|(_, v)| v.is_mview() && valid_table_name(&v.name))
.map(|(_, v)| v)
self.table_by_name.values().filter(|v| v.is_mview())
}

/// Iterate created materialized views, excluding the indices.
pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
.filter(|(_, v)| v.is_mview() && valid_table_name(&v.name) && v.is_created())
.map(|(_, v)| v)
.values()
.filter(|v| v.is_mview() && v.is_created())
}

/// Iterate all indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn read_pg_constraint_in_schema(schema: &SchemaCatalog) -> Vec<PgConstraint> {
.map(|table| PgConstraint::from_system_table(schema, table.as_ref()));

let table_rows = schema
.iter_valid_table()
.iter_table_mv_indices()
.map(|table| PgConstraint::from_table(schema, table.as_ref()));

system_table_rows.chain(table_rows).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn read_rw_columns_in_schema(schema: &SchemaCatalog) -> Vec<RwColumn> {
})
});

let table_rows = schema.iter_valid_table().flat_map(|table| {
let table_rows = schema.iter_table_mv_indices().flat_map(|table| {
let schema = table.column_schema();
table
.columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwDescription>> {

Ok(schemas
.flat_map(|schema| {
schema.iter_table().flat_map(|table| {
schema.iter_user_table().flat_map(|table| {
iter::once(build_row(
table.id.table_id as _,
rw_tables_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
table_ids.push(t.id.table_id);
});

schema_catalog.iter_table().for_each(|t| {
schema_catalog.iter_user_table().for_each(|t| {
table_ids.push(t.id.table_id);
});

Expand Down Expand Up @@ -100,7 +100,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
}
});

schema_catalog.iter_table().for_each(|t| {
schema_catalog.iter_user_table().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.table_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {

Ok(schemas
.flat_map(|schema| {
schema.iter_table().map(|table| RwTable {
schema.iter_user_table().map(|table| RwTable {
id: table.id.table_id as i32,
name: table.name().to_string(),
schema_id: schema.id() as i32,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl TableCatalog {
self.table_type
}

pub fn is_table(&self) -> bool {
pub fn is_user_table(&self) -> bool {
self.table_type == TableType::Table
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_swap_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub async fn handle_swap_rename(
))
.into());
}
if stmt_type == StatementType::ALTER_TABLE && !src_table.is_table() {
if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
return Err(CatalogError::NotFound("table", src_obj_name.to_string()).into());
} else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
return Err(
Expand Down
50 changes: 2 additions & 48 deletions src/frontend/src/handler/drop_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_sqlparser::ast::{DropMode, ObjectName};

use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::CatalogError;
use crate::error::{ErrorCode, Result};
use crate::handler::HandlerArgs;

Expand Down Expand Up @@ -62,24 +61,8 @@ pub async fn handle_drop_schema(
};
match mode {
Some(DropMode::Restrict) | None => {
if let Some(table) = schema.iter_table().next() {
return Err(CatalogError::NotEmpty(
"schema",
schema_name,
"table",
table.name.clone(),
)
.into());
}
if let Some(source) = schema.iter_source().next() {
return Err(CatalogError::NotEmpty(
"schema",
schema_name,
"source",
source.name.clone(),
)
.into());
}
// Note: we don't check if the schema is empty here.
// The check is done in meta `ensure_schema_empty`.
}
Some(DropMode::Cascade) => {
bail_not_implemented!(issue = 6773, "drop schema with cascade mode");
Expand All @@ -92,32 +75,3 @@ pub async fn handle_drop_schema(
catalog_writer.drop_schema(schema.id()).await?;
Ok(PgResponse::empty_result(StatementType::DROP_SCHEMA))
}

#[cfg(test)]
mod tests {
use crate::test_utils::LocalFrontend;

#[tokio::test]
async fn test_drop_schema() {
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
let catalog_reader = session.env().catalog_reader();

frontend.run_sql("CREATE SCHEMA schema").await.unwrap();

frontend.run_sql("CREATE TABLE schema.table").await.unwrap();

assert!(frontend.run_sql("DROP SCHEMA schema").await.is_err());

frontend.run_sql("DROP TABLE schema.table").await.unwrap();

frontend.run_sql("DROP SCHEMA schema").await.unwrap();

let schema = catalog_reader
.read_guard()
.get_database_by_name("schema")
.ok()
.cloned();
assert!(schema.is_none());
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ pub async fn handle_show_object(
.get_schema_by_name(session.database(), schema.as_ref())
{
table_names_in_schema
.extend(schema_catalog.iter_table().map(|t| t.name.clone()));
.extend(schema_catalog.iter_user_table().map(|t| t.name.clone()));
}
}

Expand Down Expand Up @@ -646,7 +646,7 @@ pub fn handle_show_create_object(
ShowCreateType::Table => {
let table = schema
.get_created_table_by_name(&object_name)
.filter(|t| t.is_table())
.filter(|t| t.is_user_table())
.ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
table.create_sql()
}
Expand Down

0 comments on commit d6cf971

Please sign in to comment.