Skip to content

Commit edb241f

Browse files
author
Sean Loiselle
committed
sql: add CREATE SUBSOURCE options for data sources
Subsources need to persist whether or not they are progress subsources, which will ultimately determine their initial read policy in the storage controller. To make this unambiguous, also include a type for the current reference-style subsources.
1 parent 0c6d9d6 commit edb241f

File tree

8 files changed

+170
-32
lines changed

8 files changed

+170
-32
lines changed

src/adapter/src/catalog/migrate.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ use semver::Version;
1111
use std::collections::BTreeMap;
1212

1313
use mz_ore::collections::CollectionExt;
14-
use mz_sql::ast::Raw;
1514
use mz_sql::ast::{
16-
display::AstDisplay, CreateSourceStatement, CreateSourceSubsource, DeferredObjectName,
17-
RawObjectName, ReferencedSubsources, Statement, UnresolvedObjectName,
15+
display::AstDisplay, CreateSourceStatement, CreateSourceSubsource, CreateSubsourceOptionName,
16+
CreateSubsourceStatement, DeferredObjectName, Raw, RawObjectName, ReferencedSubsources,
17+
Statement, UnresolvedObjectName,
1818
};
1919

2020
use crate::catalog::{Catalog, ConnCatalog, SerializedCatalogItem, SYSTEM_CONN_ID};
@@ -51,7 +51,10 @@ pub(crate) async fn migrate(catalog: &mut Catalog) -> Result<(), anyhow::Error>
5151
};
5252
let mut tx = storage.transaction().await?;
5353
// First, do basic AST -> AST transformations.
54-
rewrite_items(&mut tx, |_stmt| Ok(()))?;
54+
rewrite_items(&mut tx, |stmt| {
55+
subsource_type_option_rewrite(stmt);
56+
Ok(())
57+
})?;
5558

5659
// Then, load up a temporary catalog with the rewritten items, and perform
5760
// some transformations that require introspecting the catalog. These
@@ -89,6 +92,27 @@ pub(crate) async fn migrate(catalog: &mut Catalog) -> Result<(), anyhow::Error>
8992
// AST migrations -- Basic AST -> AST transformations
9093
// ****************************************************************************
9194

95+
// Mark all current subsources as "references" subsources in anticipation of
96+
// adding "progress" subsources.
97+
// TODO: delete in version v0.45 (released in v0.43 + 1 additional release)
98+
fn subsource_type_option_rewrite(stmt: &mut mz_sql::ast::Statement<Raw>) {
99+
if let Statement::CreateSubsource(CreateSubsourceStatement { with_options, .. }) = stmt {
100+
if !with_options.iter().any(|option| {
101+
matches!(
102+
option.name,
103+
CreateSubsourceOptionName::Progress | CreateSubsourceOptionName::References
104+
)
105+
}) {
106+
with_options.push(mz_sql::ast::CreateSubsourceOption {
107+
name: CreateSubsourceOptionName::References,
108+
value: Some(mz_sql::ast::WithOptionValue::Value(
109+
mz_sql::ast::Value::Boolean(true),
110+
)),
111+
});
112+
}
113+
}
114+
}
115+
92116
// ****************************************************************************
93117
// Semantic migrations -- Weird migrations that require access to the catalog
94118
// ****************************************************************************

src/sql-parser/src/ast/defs/statement.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,13 +659,50 @@ impl<T: AstInfo> AstDisplay for ReferencedSubsources<T> {
659659
}
660660
impl_display_t!(ReferencedSubsources);
661661

662+
/// An option in a `CREATE SUBSOURCE` statement.
663+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
664+
pub enum CreateSubsourceOptionName {
665+
Progress,
666+
References,
667+
}
668+
669+
impl AstDisplay for CreateSubsourceOptionName {
670+
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
671+
match self {
672+
CreateSubsourceOptionName::Progress => {
673+
f.write_str("PROGRESS");
674+
}
675+
CreateSubsourceOptionName::References => {
676+
f.write_str("REFERENCES");
677+
}
678+
}
679+
}
680+
}
681+
682+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
683+
pub struct CreateSubsourceOption<T: AstInfo> {
684+
pub name: CreateSubsourceOptionName,
685+
pub value: Option<WithOptionValue<T>>,
686+
}
687+
688+
impl<T: AstInfo> AstDisplay for CreateSubsourceOption<T> {
689+
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
690+
f.write_node(&self.name);
691+
if let Some(v) = &self.value {
692+
f.write_str(" = ");
693+
f.write_node(v);
694+
}
695+
}
696+
}
697+
662698
/// `CREATE SUBSOURCE`
663699
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
664700
pub struct CreateSubsourceStatement<T: AstInfo> {
665701
pub name: UnresolvedObjectName,
666702
pub columns: Vec<ColumnDef<T>>,
667703
pub constraints: Vec<TableConstraint<T>>,
668704
pub if_not_exists: bool,
705+
pub with_options: Vec<CreateSubsourceOption<T>>,
669706
}
670707

671708
impl<T: AstInfo> AstDisplay for CreateSubsourceStatement<T> {
@@ -682,6 +719,12 @@ impl<T: AstInfo> AstDisplay for CreateSubsourceStatement<T> {
682719
f.write_node(&display::comma_separated(&self.constraints));
683720
}
684721
f.write_str(")");
722+
723+
if !self.with_options.is_empty() {
724+
f.write_str(" WITH (");
725+
f.write_node(&display::comma_separated(&self.with_options));
726+
f.write_str(")");
727+
}
685728
}
686729
}
687730
impl_display_t!(CreateSubsourceStatement);

src/sql-parser/src/parser.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,14 +2392,44 @@ impl<'a> Parser<'a> {
23922392

23932393
let (columns, constraints) = self.parse_columns(Mandatory)?;
23942394

2395+
let with_options = if self.parse_keyword(WITH) {
2396+
self.expect_token(&Token::LParen)?;
2397+
let options = self.parse_comma_separated(Parser::parse_create_subsource_option)?;
2398+
self.expect_token(&Token::RParen)?;
2399+
options
2400+
} else {
2401+
vec![]
2402+
};
2403+
23952404
Ok(Statement::CreateSubsource(CreateSubsourceStatement {
23962405
name,
23972406
if_not_exists,
23982407
columns,
23992408
constraints,
2409+
with_options,
24002410
}))
24012411
}
24022412

2413+
/// Parse the name of a CREATE SINK optional parameter
2414+
fn parse_create_subsource_option_name(
2415+
&mut self,
2416+
) -> Result<CreateSubsourceOptionName, ParserError> {
2417+
let name = match self.expect_one_of_keywords(&[PROGRESS, REFERENCES])? {
2418+
PROGRESS => CreateSubsourceOptionName::Progress,
2419+
REFERENCES => CreateSubsourceOptionName::References,
2420+
_ => unreachable!(),
2421+
};
2422+
Ok(name)
2423+
}
2424+
2425+
/// Parse a NAME = VALUE parameter for CREATE SINK
2426+
fn parse_create_subsource_option(&mut self) -> Result<CreateSubsourceOption<Raw>, ParserError> {
2427+
Ok(CreateSubsourceOption {
2428+
name: self.parse_create_subsource_option_name()?,
2429+
value: self.parse_optional_option_value()?,
2430+
})
2431+
}
2432+
24032433
fn parse_create_source(&mut self) -> Result<Statement<Raw>, ParserError> {
24042434
self.expect_keyword(SOURCE)?;
24052435
let if_not_exists = self.parse_if_not_exists()?;

0 commit comments

Comments
 (0)