diff --git a/Cargo.lock b/Cargo.lock index 88102bb3fdc8..6ce7a67779e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,6 +1952,7 @@ dependencies = [ "datafusion-proto", "env_logger", "futures", + "itertools 0.14.0", "log", "mimalloc", "nix", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index feafa48b3954..53ba9da35ef1 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -65,6 +65,7 @@ datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } diff --git a/datafusion-examples/examples/metadata_columns.rs b/datafusion-examples/examples/metadata_columns.rs new file mode 100644 index 000000000000..8ebb9abf0d4a --- /dev/null +++ b/datafusion-examples/examples/metadata_columns.rs @@ -0,0 +1,314 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use arrow::array::{ArrayRef, StringArray, UInt64Array}; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, +}; + +use datafusion::prelude::*; + +use datafusion::catalog::Session; +use datafusion::common::FieldId; +use itertools::Itertools; +use tokio::time::timeout; + +/// This example demonstrates executing a simple query against a custom datasource +#[tokio::main] +async fn main() -> Result<()> { + // create our custom datasource and adding some users + let db = CustomDataSource::default(); + db.populate_users(); + + search_accounts(db.clone(), "select * from accounts", 3).await?; + search_accounts( + db.clone(), + "select _rowid, _file, * from accounts where _rowid > 1", + 1, + ) + .await?; + search_accounts( + db.clone(), + "select _rowid, _file, * from accounts where _file = 'file-0'", + 1, + ) + .await?; + + Ok(()) +} + +async fn search_accounts( + db: CustomDataSource, + sql: &str, + expected_result_length: usize, +) -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); + ctx.register_table("accounts", Arc::new(db)).unwrap(); + let options = SQLOptions::new().with_allow_ddl(false); + + timeout(Duration::from_secs(10), async move { + let dataframe = ctx.sql_with_options(sql, options).await.unwrap(); + let result = dataframe.collect().await.unwrap(); + let record_batch = result.first().unwrap(); + + assert_eq!(expected_result_length, record_batch.column(1).len()); + dbg!(record_batch.columns()); + }) + .await + .unwrap(); + + Ok(()) +} + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone)] +pub struct CustomDataSource { + inner: Arc>, + metadata_columns: SchemaRef, +} + +struct CustomDataSourceInner { + data: Vec, +} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + ) -> Result> { + Ok(Arc::new(CustomExec::new(projections, self.clone()))) + } + + pub(crate) fn populate_users(&self) { + self.add_user(User { + id: 1, + bank_account: 9_000, + }); + self.add_user(User { + id: 2, + bank_account: 100, + }); + self.add_user(User { + id: 3, + bank_account: 1_000, + }); + } + + fn add_user(&self, user: User) { + let mut inner = self.inner.lock().unwrap(); + inner.data.push(user); + } +} + +impl Default for CustomDataSource { + fn default() -> Self { + CustomDataSource { + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + })), + metadata_columns: Arc::new(Schema::new(vec![ + Field::new("_rowid", DataType::UInt64, false), + Field::new("_file", DataType::Utf8, false), + ])), + } + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + + fn metadata_columns(&self) -> Option { + Some(self.metadata_columns.clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + return self.create_physical_plan(projection).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn new(projections: Option<&Vec>, db: CustomDataSource) -> Self { + let schema = db.schema(); + let metadata_schema = db.metadata_columns(); + let projected_schema = match projections { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| match FieldId::from(*idx) { + FieldId::Normal(i) => Arc::new(schema.field(i).clone()), + FieldId::Metadata(i) => { + Arc::new(metadata_schema.as_ref().unwrap().field(i).clone()) + } + }) + .collect_vec(); + Arc::new(Schema::new(projection)) + } + None => schema, + }; + let cache = Self::compute_properties(projected_schema.clone()); + Self { + db, + projected_schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let users: Vec = { + let db = self.db.inner.lock().unwrap(); + db.data.clone() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + let len = users.len() as u64; + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + let id_array = id_array.finish(); + let account_array = account_array.finish(); + let rowid_array = UInt64Array::from_iter_values(0_u64..len); + let file_array = + StringArray::from_iter_values((0_u64..len).map(|i| format!("file-{}", i))); + + let arrays = self + .projected_schema + .fields + .iter() + .map(|f| match f.name().as_str() { + "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, + "id" => Arc::new(id_array.clone()) as ArrayRef, + "bank_account" => Arc::new(account_array.clone()) as ArrayRef, + "_file" => Arc::new(file_array.clone()) as ArrayRef, + _ => panic!("cannot reach here"), + }) + .collect(); + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new(self.projected_schema.clone(), arrays)?], + self.schema(), + None, + )?)) + } +} diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ecc792f73d30..6ca81368109b 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -56,6 +56,51 @@ pub trait TableProvider: Debug + Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Return a reference to the schema for metadata columns. + /// + /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, `ctid` in Postgres would be considered a metadata column + /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. + /// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). + /// + /// You can use this method to declare which columns in the table are "metadata" columns. + /// See `datafusion/core/tests/sql/metadata_columns.rs` for an example of this in action. + /// + /// As an example of how this works in practice, if you have the following Postgres table: + /// + /// ```sql + /// CREATE TABLE t (x int); + /// INSERT INTO t VALUES (1); + /// ``` + /// + /// And you do a `SELECT * FROM t`, you would get the following schema: + /// + /// ```text + /// +---+ + /// | x | + /// +---+ + /// | 1 | + /// +---+ + /// ``` + /// + /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): + /// + /// ```text + /// +-----+---+ + /// | ctid| x | + /// +-----+---+ + /// | 0 | 1 | + /// +-----+---+ + /// ``` + /// + /// Returns: + /// - `None` for tables that do not have metadata columns. + /// - `Some(SchemaRef)` for tables having metadata columns. + /// The returned schema should be be the schema of _only_ the metadata columns, not the full schema. + fn metadata_columns(&self) -> Option { + None + } + /// Get a reference to the constraints of the table. /// Returns: /// - `None` for tables that do not support constraints. diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 99fb179c76a3..6be5446a0007 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result, _plan_err, _schema_err}; use crate::{ field_not_found, unqualified_field_not_found, Column, FunctionalDependencies, - SchemaError, TableReference, + JoinType, SchemaError, TableReference, }; use arrow::compute::can_cast_types; @@ -104,42 +104,448 @@ pub type DFSchemaRef = Arc; /// let schema = Schema::from(df_schema); /// assert_eq!(schema.fields().len(), 1); /// ``` +/// +/// DFSchema also supports metadata columns. +/// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) +/// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). +/// +/// These columns are stored in a separate schema from the main schema, which can be accessed using [DFSchema::metadata_schema]. +/// To build a schema with metadata columns, use [DFSchema::new_with_metadata]: +/// ```rust +/// use datafusion_common::{DFSchema, Column, TableReference}; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use std::collections::HashMap; +/// +/// let schema = Schema::new(vec![ +/// Field::new("c1", DataType::Int32, false), +/// ]); +/// let metadata_schema = Schema::new(vec![ +/// Field::new("file", DataType::Utf8, false), +/// ]); +/// +/// let df_schema = DFSchema::new_with_metadata( +/// vec![(None, Field::new("c1", DataType::Int32, false).into())], HashMap::new()); +/// #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// Metadata columns are data columns for a table that are not in the table schema. + /// For example, a file source could expose a "file" column that contains the path of the file that contained each row. + /// See Also: [Spark SupportsMetadataColumns]: + metadata: Option, +} + +/// The starting point of the metadata column index. +/// If an index is less than this value, then this index is for an ordinary column. +/// If it is greater than or equal to this value, then this index is for a metadata column. +const METADATA_OFFSET: usize = usize::MAX >> 1; + +/// Represents a field identifier in a schema that can be either a normal field or a metadata field. +/// +/// DataFusion schemas can contain both normal data columns and metadata columns. This enum +/// helps distinguish between the two types when referencing fields by index. +/// +/// # Examples +/// ```rust +/// use datafusion_common::FieldId; +/// +/// // Create a normal field ID +/// let normal = FieldId::Normal(5); +/// +/// // Create a metadata field ID +/// let metadata = FieldId::Metadata(2); +/// ``` +/// +pub enum FieldId { + Metadata(usize), + Normal(usize), +} + +impl From for FieldId { + fn from(index: usize) -> Self { + if index >= METADATA_OFFSET { + FieldId::Metadata(index - METADATA_OFFSET) + } else { + FieldId::Normal(index) + } + } +} + +impl From for usize { + fn from(value: FieldId) -> Self { + match value { + FieldId::Metadata(id) => id + METADATA_OFFSET, + FieldId::Normal(id) => id, + } + } +} + +/// QualifiedSchema wraps an Arrow schema and field qualifiers. +/// Some fields may be qualified and some unqualified. +/// A qualified field is a field that has a relation name associated with it. +/// For example, a qualified field would be `table_name.column_name` and an unqualified field would be just `column_name`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QualifiedSchema { /// Inner Arrow schema reference. - inner: SchemaRef, + schema: SchemaRef, /// Optional qualifiers for each column in this schema. In the same order as /// the `self.inner.fields()` field_qualifiers: Vec>, - /// Stores functional dependencies in the schema. - functional_dependencies: FunctionalDependencies, } -impl DFSchema { - /// Creates an empty `DFSchema` +/// A table schema that holds not just column names but also the name of the table they belong to. +/// For example, consider `table_name.column_name` (qualified) vs. just `column_name` (unqualified). +impl QualifiedSchema { + /// Creates an empty `QualifiedSchema`. pub fn empty() -> Self { Self { - inner: Arc::new(Schema::new([])), + schema: Arc::new(Schema::new([])), field_qualifiers: vec![], + } + } + + /// Creates a new `QualifiedSchema` from an Arrow schema and a list of table references. + /// The table references must be of the same length as the fields in the schema and + /// follow the same order. + pub fn new( + schema: SchemaRef, + field_qualifiers: Vec>, + ) -> Result { + if schema.fields().len() != field_qualifiers.len() { + return _schema_err!(SchemaError::UnmatchedFieldQualifiers { + field_count: schema.fields().len(), + qualifier_count: field_qualifiers.len(), + }); + } + Ok(QualifiedSchema { + schema, + field_qualifiers, + }) + } + + /// Create a new `QualifiedSchema` from a list of Arrow [Field]s where they all share the same [TableReference]. + /// + /// For example, to create a schema for a table with all fields qualified by `table_name`: + /// ```rust + /// use std::sync::Arc; + /// use datafusion_common::{QualifiedSchema, TableReference}; + /// use arrow::datatypes::{DataType, Field, Schema}; + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("c1", DataType::Int32, false), + /// ])); + /// let table_name = TableReference::from("table_name"); + /// let qualified_schema = QualifiedSchema::new_with_table(schema, &table_name); + /// ``` + /// + /// To create a schema where fields have different qualifiers, use [QualifiedSchema::new]. + pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { + let field_qualifiers = schema + .fields() + .iter() + .map(|_| Some(table_name.clone())) + .collect(); + Self::new(schema, field_qualifiers) + .expect("field qualifier length should match schema") + } + + /// Get a reference to the underlying Arrow Schema + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Checks if the schema is empty. + /// + /// Returns: + /// - `true` if the schema has no fields + /// - `false` if it has any fields, qualified or unqualified + pub fn is_empty(&self) -> bool { + self.schema.fields.is_empty() + } + + /// Returns the number of fields in the schema, be they qualified or unqualified. + pub fn len(&self) -> usize { + self.schema.fields.len() + } + + /// Look up the field by it's unqualified name. + /// + /// This returns a Vec of fields and their qualifier for any field that have the given unqualified name. + /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for `a` you will get `table1.a` and `table2.a` + /// as [(`table1`, `a`), (`table2`, `a`)]. + pub fn qualified_fields_with_unqualified_name( + &self, + name: &str, + ) -> Vec<(Option<&TableReference>, &Field)> { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(qualifier, field)| (qualifier, field.as_ref())) + .collect() + } + + /// Iterate over the qualifiers and fields in the DFSchema. + pub fn iter(&self) -> impl Iterator, &FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.schema.fields().iter()) + .map(|(qualifier, field)| (qualifier.as_ref(), field)) + } + + /// Similar to [Self::qualified_fields_with_unqualified_name] but discards the qualifier in the result. + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.fields() + .iter() + .filter(|field| field.name() == name) + .map(|f| f.as_ref()) + .collect() + } + + /// Get a list of fields + pub fn fields(&self) -> &Fields { + &self.schema.fields + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector + pub fn field(&self, i: usize) -> &Field { + &self.schema.fields[i] + } + + /// Returns an immutable reference to a specific `Field` and it's qualifier using an + /// offset within the internal `fields` vector and its qualifier + pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { + (self.field_qualifiers[i].as_ref(), self.field(i)) + } + + /// Search for a field using it's qualified name. + /// + /// This will return the field if it exists, otherwise it will return `None`. + /// + /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for (`table1`, `a`) you will get the [Field] for `a` back. + pub fn field_with_qualified_name( + &self, + qualifier: &TableReference, + name: &str, + ) -> Option<&Field> { + let mut matches = self + .iter() + .filter(|(q, f)| match q { + Some(field_q) => qualifier.resolved_eq(field_q) && f.name() == name, + None => false, + }) + .map(|(_, f)| f.as_ref()); + matches.next() + } + + /// Get the internal index of a column using it's unqualified name. + /// If multiple columns have the same unqualified name, the index of the first one is returned. + /// If no column is found, `None` is returned. + /// This index can be used to access the column via [Self::field] or [Self::qualified_field]. + pub fn index_of_column_by_name( + &self, + qualifier: Option<&TableReference>, + name: &str, + ) -> Option { + let mut matches = self + .iter() + .enumerate() + .filter(|(_, (q, f))| match (qualifier, q) { + // field to lookup is qualified. + // current field is qualified and not shared between relations, compare both + // qualifier and name. + (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, + // field to lookup is qualified but current field is unqualified. + (Some(_), None) => false, + // field to lookup is unqualified, no need to compare qualifier + (None, Some(_)) | (None, None) => f.name() == name, + }) + .map(|(idx, _)| idx); + matches.next() + } + + /// Get only the qualifier of a field using it's internal index. + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + self.field_qualifiers[i].as_ref() + } + /// Join two qualified schemas together by concatenating their fields. + /// + /// This method creates a new schema by combining the fields from `self` followed by the fields from `schema`. + /// The metadata from both schemas is also merged. + /// + /// # Arguments + /// + /// * `schema` - The schema to join with this schema + /// + /// # Returns + /// + /// Returns a new `QualifiedSchema` containing all fields from both schemas, or an error if the join fails. + /// + /// # Example + /// + /// ```rust + /// use datafusion_common::{DFSchema, QualifiedSchema, TableReference}; + /// use arrow::datatypes::{DataType, Field, Schema}; + /// use std::sync::Arc; + /// + /// let schema1 = QualifiedSchema::new_with_table( + /// Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + /// &TableReference::from("t1") + /// ); + /// + /// let schema2 = QualifiedSchema::new_with_table( + /// Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)])), + /// &TableReference::from("t2") + /// ); + /// + /// let joined = schema1.join(&schema2).unwrap(); + /// assert_eq!(joined.len(), 2); + /// ``` + pub fn join(&self, schema: &QualifiedSchema) -> Result { + let mut schema_builder = SchemaBuilder::new(); + schema_builder.extend(self.fields().iter().cloned()); + schema_builder.extend(schema.fields().iter().cloned()); + let new_schema = schema_builder.finish(); + + let mut new_metadata: HashMap = self.schema.metadata.clone(); + new_metadata.extend(schema.schema.metadata.clone()); + let new_schema_with_metadata = new_schema.with_metadata(new_metadata); + + let mut new_qualifiers = self.field_qualifiers.clone(); + new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice()); + + QualifiedSchema::new(Arc::new(new_schema_with_metadata), new_qualifiers) + } + + /// Merge another schema into this schema, ignoring any duplicate fields. + /// + /// This method modifies the current schema by appending non-duplicate fields from the other schema. + /// Fields are considered duplicates if: + /// - For qualified fields: they have the same qualifier and field name + /// - For unqualified fields: they have the same field name + /// + /// # Arguments + /// + /// * `other_schema` - The schema to merge into this one + /// + /// # Example + /// + /// ``` + /// use datafusion_common::{DFSchema, QualifiedSchema, TableReference}; + /// use arrow::datatypes::{DataType, Field, Schema}; + /// use std::sync::Arc; + /// + /// let mut schema1 = QualifiedSchema::new_with_table( + /// Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + /// &TableReference::from("t1") + /// ); + /// + /// let schema2 = QualifiedSchema::new_with_table( + /// Arc::new(Schema::new(vec![ + /// Field::new("id", DataType::Int32, false), + /// Field::new("name", DataType::Utf8, false) + /// ])), + /// &TableReference::from("t1") + /// ); + /// + /// schema1.merge(&schema2); + /// // Only "name" is added since "id" already exists + /// assert_eq!(schema1.len(), 2); + /// ``` + pub fn merge(&mut self, other_schema: &QualifiedSchema) { + if other_schema.schema.fields.is_empty() { + return; + } + + let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> = + self.iter().collect(); + let self_unqualified_names: HashSet<&str> = self + .schema + .fields + .iter() + .map(|field| field.name().as_str()) + .collect(); + + let mut schema_builder = SchemaBuilder::from(self.schema.fields.clone()); + let mut qualifiers = Vec::new(); + for (qualifier, field) in other_schema.iter() { + // skip duplicate columns + let duplicated_field = match qualifier { + Some(q) => self_fields.contains(&(Some(q), field)), + // for unqualified columns, check as unqualified name + None => self_unqualified_names.contains(field.name().as_str()), + }; + if !duplicated_field { + schema_builder.push(Arc::clone(field)); + qualifiers.push(qualifier.cloned()); + } + } + let mut metadata = self.schema.metadata.clone(); + metadata.extend(other_schema.schema.metadata.clone()); + + let finished = schema_builder.finish(); + let finished_with_metadata = finished.with_metadata(metadata); + self.schema = finished_with_metadata.into(); + self.field_qualifiers.extend(qualifiers); + } +} + +impl DFSchema { + /// Creates an empty `DFSchema` with no fields and no metadata columns. + pub fn empty() -> Self { + Self { + inner: QualifiedSchema::empty(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, } } + /// Return a reference to the schema for metadata columns. + /// + /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) + /// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). + /// + /// Implementers of [TableProvider](../catalog/trait.TableProvider.html) can use this declare which columns in the table are "metadata" columns. + /// See also [TableProvider](../catalog/trait.TableProvider.html#method.metadata_columns) for more information or `datafusion/core/tests/sql/metadata_columns.rs` for a full example. + /// + /// Returns: + /// - `&None` for tables that do not have metadata columns. + /// - `&Some(QualifiedSchema)` for tables having metadata columns. + pub fn metadata_schema(&self) -> &Option { + &self.metadata + } + /// Return a reference to the inner Arrow [`Schema`] /// /// Note this does not have the qualifier information pub fn as_arrow(&self) -> &Schema { - self.inner.as_ref() + self.inner.schema.as_ref() } /// Return a reference to the inner Arrow [`SchemaRef`] /// /// Note this does not have the qualifier information pub fn inner(&self) -> &SchemaRef { - &self.inner + &self.inner.schema } - /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier + /// Set the metadata schema for an existing [`DFSchema`]. + /// Note that this is the schema for the metadata columns (see [DFSchema::metadata_schema]). + /// Not to be confused with the metadata of the schema itself (see [Schema::with_metadata]). + pub fn with_metadata_schema( + mut self, + metadata_schema: Option, + ) -> Self { + self.metadata = metadata_schema; + self + } + + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier and the schema has fixed metadata. + /// This is not to be confused with the _metadata schema_ or _metadata columns_ which are a completely different concept. + /// In this method `metadata` refers to the metadata of the schema itself, which is arbitrary key-value pairs. + /// See [Schema::with_metadata] for more information. pub fn new_with_metadata( qualified_fields: Vec<(Option, Arc)>, metadata: HashMap, @@ -150,9 +556,9 @@ impl DFSchema { let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: schema, - field_qualifiers: qualifiers, + inner: QualifiedSchema::new(schema, qualifiers).expect("qualifiers and fields should have the same length, we just unzipped them"), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -179,9 +585,10 @@ impl DFSchema { let field_count = fields.len(); let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: schema, - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema, vec![None; field_count]) + .expect("qualifiers length is hardcoded to be the same as fields length"), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -197,9 +604,13 @@ impl DFSchema { ) -> Result { let qualifier = qualifier.into(); let schema = DFSchema { - inner: schema.clone().into(), - field_qualifiers: vec![Some(qualifier); schema.fields.len()], + inner: QualifiedSchema::new( + schema.clone().into(), + vec![Some(qualifier); schema.fields.len()], + ) + .expect("qualifiers and fields have the same length"), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; schema.check_names()?; Ok(schema) @@ -211,9 +622,9 @@ impl DFSchema { schema: &SchemaRef, ) -> Result { let dfschema = Self { - inner: Arc::clone(schema), - field_qualifiers: qualifiers, + inner: QualifiedSchema::new(Arc::clone(schema), qualifiers)?, functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -224,7 +635,9 @@ impl DFSchema { let mut qualified_names = BTreeSet::new(); let mut unqualified_names = BTreeSet::new(); - for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) { + for (field, qualifier) in + self.inner.fields().iter().zip(&self.inner.field_qualifiers) + { if let Some(qualifier) = qualifier { if !qualified_names.insert((qualifier, field.name())) { return _schema_err!(SchemaError::DuplicateQualifiedField { @@ -254,7 +667,7 @@ impl DFSchema { mut self, functional_dependencies: FunctionalDependencies, ) -> Result { - if functional_dependencies.is_valid(self.inner.fields.len()) { + if functional_dependencies.is_valid(self.inner.schema.fields.len()) { self.functional_dependencies = functional_dependencies; Ok(self) } else { @@ -265,25 +678,68 @@ impl DFSchema { } } + /// Build metadata schema for join operation based on join type + /// + /// # Arguments + /// * `left` - Left input's qualified schema + /// * `right` - Right input's qualified schema + /// * `join_type` - Type of join operation + /// + /// # Returns + /// * `Result>` - The resulting metadata schema after join + /// + /// # Details + /// For different join types: + /// - Left/LeftSemi/LeftAnti/LeftMark joins: Use left schema + /// - Right/RightSemi/RightAnti joins: Use right schema + /// - Inner/Full joins: Join both schemas if they exist + pub fn join_metadata_schema( + left: &Option, + right: &Option, + join_type: &JoinType, + ) -> Result> { + match join_type { + JoinType::LeftSemi | JoinType::LeftMark => Ok(left.clone()), + JoinType::RightSemi => Ok(right.clone()), + _ => { + let ret = match (left, right) { + (Some(left), Some(right)) => Some(left.join(right)?), + (None, Some(right)) => Some(right.clone()), + (Some(left), None) => Some(left.clone()), + (None, None) => None, + }; + Ok(ret) + } + } + } + /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. - pub fn join(&self, schema: &DFSchema) -> Result { - let mut schema_builder = SchemaBuilder::new(); - schema_builder.extend(self.inner.fields().iter().cloned()); - schema_builder.extend(schema.fields().iter().cloned()); - let new_schema = schema_builder.finish(); - - let mut new_metadata = self.inner.metadata.clone(); - new_metadata.extend(schema.inner.metadata.clone()); - let new_schema_with_metadata = new_schema.with_metadata(new_metadata); - - let mut new_qualifiers = self.field_qualifiers.clone(); - new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice()); + pub fn join_with_type( + &self, + schema: &DFSchema, + join_type: &JoinType, + ) -> Result { + let new_self = Self { + inner: self.inner.join(&schema.inner)?, + functional_dependencies: FunctionalDependencies::empty(), + metadata: DFSchema::join_metadata_schema( + &self.metadata, + &schema.metadata, + join_type, + )?, + }; + new_self.check_names()?; + Ok(new_self) + } + /// Create a new schema that contains the fields from this schema followed by the fields + /// from the supplied schema. An error will be returned if there are duplicate field names. + pub fn join(&self, schema: &DFSchema) -> Result { let new_self = Self { - inner: Arc::new(new_schema_with_metadata), - field_qualifiers: new_qualifiers, + inner: self.inner.join(&schema.inner)?, functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; new_self.check_names()?; Ok(new_self) @@ -292,79 +748,59 @@ impl DFSchema { /// Modify this schema by appending the fields from the supplied schema, ignoring any /// duplicate fields. pub fn merge(&mut self, other_schema: &DFSchema) { - if other_schema.inner.fields.is_empty() { - return; - } - - let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> = - self.iter().collect(); - let self_unqualified_names: HashSet<&str> = self - .inner - .fields - .iter() - .map(|field| field.name().as_str()) - .collect(); - - let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone()); - let mut qualifiers = Vec::new(); - for (qualifier, field) in other_schema.iter() { - // skip duplicate columns - let duplicated_field = match qualifier { - Some(q) => self_fields.contains(&(Some(q), field)), - // for unqualified columns, check as unqualified name - None => self_unqualified_names.contains(field.name().as_str()), - }; - if !duplicated_field { - schema_builder.push(Arc::clone(field)); - qualifiers.push(qualifier.cloned()); + self.inner.merge(&other_schema.inner); + if let Some(other_metadata) = &other_schema.metadata { + match &mut self.metadata { + Some(metadata) => metadata.merge(other_metadata), + None => self.metadata = Some(other_metadata.clone()), } } - let mut metadata = self.inner.metadata.clone(); - metadata.extend(other_schema.inner.metadata.clone()); - - let finished = schema_builder.finish(); - let finished_with_metadata = finished.with_metadata(metadata); - self.inner = finished_with_metadata.into(); - self.field_qualifiers.extend(qualifiers); } /// Get a list of fields pub fn fields(&self) -> &Fields { - &self.inner.fields + &self.inner.schema.fields } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector pub fn field(&self, i: usize) -> &Field { - &self.inner.fields[i] + if i >= METADATA_OFFSET { + if let Some(metadata) = &self.metadata { + return metadata.field(i - METADATA_OFFSET); + } + } + self.inner.field(i) } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector and its qualifier pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { - (self.field_qualifiers[i].as_ref(), self.field(i)) + if i >= METADATA_OFFSET { + if let Some(metadata) = &self.metadata { + return metadata.qualified_field(i - METADATA_OFFSET); + } + } + self.inner.qualified_field(i) } + /// Get the internal index of a column using it's unqualified name and an optional qualifier. + /// If a non-metadata column is found, it's index is returned. + /// If a metadata column is found, it's index is returned with an offset of `METADATA_OFFSET`. pub fn index_of_column_by_name( &self, qualifier: Option<&TableReference>, name: &str, ) -> Option { - let mut matches = self - .iter() - .enumerate() - .filter(|(_, (q, f))| match (qualifier, q) { - // field to lookup is qualified. - // current field is qualified and not shared between relations, compare both - // qualifier and name. - (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, - // field to lookup is qualified but current field is unqualified. - (Some(_), None) => false, - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => f.name() == name, - }) - .map(|(idx, _)| idx); - matches.next() + if let Some(idx) = self.inner.index_of_column_by_name(qualifier, name) { + return Some(idx); + } + if let Some(metadata) = &self.metadata { + return metadata + .index_of_column_by_name(qualifier, name) + .map(|idx| idx + METADATA_OFFSET); + } + None } /// Find the index of the column with the given qualifier and name, @@ -405,6 +841,15 @@ impl DFSchema { } } + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + if i >= METADATA_OFFSET { + if let Some(metadata) = &self.metadata { + return metadata.field_qualifier(i - METADATA_OFFSET); + } + } + self.inner.field_qualifier(i) + } + /// Find the qualified field with the given name pub fn qualified_field_with_name( &self, @@ -415,7 +860,7 @@ impl DFSchema { let idx = self .index_of_column_by_name(Some(qualifier), name) .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; - Ok((self.field_qualifiers[idx].as_ref(), self.field(idx))) + Ok((self.field_qualifier(idx), self.field(idx))) } else { self.qualified_field_with_unqualified_name(name) } @@ -442,11 +887,13 @@ impl DFSchema { /// Find all fields that match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { - self.fields() - .iter() - .filter(|field| field.name() == name) - .map(|f| f.as_ref()) - .collect() + let mut fields: Vec<&Field> = self.inner.fields_with_unqualified_name(name); + if fields.is_empty() { + if let Some(schema) = self.metadata_schema() { + fields.extend(schema.fields_with_unqualified_name(name)); + } + } + fields } /// Find all fields that match the given name and return them with their qualifier @@ -454,10 +901,14 @@ impl DFSchema { &self, name: &str, ) -> Vec<(Option<&TableReference>, &Field)> { - self.iter() - .filter(|(_, field)| field.name() == name) - .map(|(qualifier, field)| (qualifier, field.as_ref())) - .collect() + let mut fields: Vec<(Option<&TableReference>, &Field)> = + self.inner.qualified_fields_with_unqualified_name(name); + if fields.is_empty() { + if let Some(schema) = self.metadata_schema() { + fields.extend(schema.qualified_fields_with_unqualified_name(name)); + } + } + fields } /// Find all fields that match the given name and convert to column @@ -521,11 +972,18 @@ impl DFSchema { qualifier: &TableReference, name: &str, ) -> Result<&Field> { - let idx = self - .index_of_column_by_name(Some(qualifier), name) - .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; + let idx = self.index_of_column_by_name(Some(qualifier), name); + if let Some(idx) = idx { + return Ok(self.field(idx)); + } + + if let Some(schema) = &self.metadata { + if let Some(f) = schema.field_with_qualified_name(qualifier, name) { + return Ok(f); + } + } - Ok(self.field(idx)) + Err(field_not_found(Some(qualifier.clone()), name, self)) } /// Find the field with the given qualified column @@ -570,6 +1028,7 @@ impl DFSchema { /// Check to see if unqualified field names matches field names in Arrow schema pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { self.inner + .schema .fields .iter() .zip(arrow_schema.fields().iter()) @@ -772,20 +1231,24 @@ impl DFSchema { /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { + let len = self.inner.len(); DFSchema { - field_qualifiers: vec![None; self.inner.fields.len()], - inner: self.inner, + inner: QualifiedSchema::new(self.inner.schema, vec![None; len]) + .expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, + metadata: self.metadata, } } /// Replace all field qualifier with new value in schema pub fn replace_qualifier(self, qualifier: impl Into) -> Self { let qualifier = qualifier.into(); + let len = self.inner.len(); DFSchema { - field_qualifiers: vec![Some(qualifier); self.inner.fields.len()], - inner: self.inner, + inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]) + .expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, + metadata: self.metadata, } } @@ -798,7 +1261,7 @@ impl DFSchema { /// Get metadata of this schema pub fn metadata(&self) -> &HashMap { - &self.inner.metadata + &self.inner.schema.metadata } /// Get functional dependencies @@ -808,7 +1271,8 @@ impl DFSchema { /// Iterate over the qualifiers and fields in the DFSchema pub fn iter(&self) -> impl Iterator, &FieldRef)> { - self.field_qualifiers + self.inner + .field_qualifiers .iter() .zip(self.inner.fields().iter()) .map(|(qualifier, field)| (qualifier.as_ref(), field)) @@ -818,16 +1282,16 @@ impl DFSchema { impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + let fields: Fields = df_schema.inner.schema.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.schema.metadata.clone()) } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + let fields: Fields = df_schema.inner.schema.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.schema.metadata.clone()) } } @@ -859,9 +1323,9 @@ impl TryFrom for DFSchema { fn try_from(schema: SchemaRef) -> Result { let field_count = schema.fields.len(); let dfschema = Self { - inner: schema, - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema, vec![None; field_count])?, functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; Ok(dfschema) } @@ -876,8 +1340,8 @@ impl From for SchemaRef { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { - self.inner.fields.hash(state); - self.inner.metadata.len().hash(state); // HashMap is not hashable + self.inner.schema.fields.hash(state); + self.inner.schema.metadata.len().hash(state); // HashMap is not hashable } } @@ -915,9 +1379,9 @@ impl ToDFSchema for Vec { metadata: HashMap::new(), }; let dfschema = DFSchema { - inner: schema.into(), - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema.into(), vec![None; field_count])?, functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; Ok(dfschema) } @@ -932,7 +1396,7 @@ impl Display for DFSchema { .map(|(q, f)| qualified_name(q, f.name())) .collect::>() .join(", "), - self.inner.metadata + self.inner.schema.metadata ) } } @@ -1296,9 +1760,13 @@ mod tests { let arrow_schema_ref = Arc::new(arrow_schema.clone()); let df_schema = DFSchema { - inner: Arc::clone(&arrow_schema_ref), - field_qualifiers: vec![None; arrow_schema_ref.fields.len()], + inner: QualifiedSchema::new( + Arc::clone(&arrow_schema_ref), + vec![None; arrow_schema_ref.fields.len()], + ) + .unwrap(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; let df_schema_ref = Arc::new(df_schema.clone()); @@ -1342,12 +1810,16 @@ mod tests { let schema = Arc::new(Schema::new(vec![a_field, b_field])); let df_schema = DFSchema { - inner: Arc::clone(&schema), - field_qualifiers: vec![None; schema.fields.len()], + inner: QualifiedSchema::new( + Arc::clone(&schema), + vec![None; schema.fields.len()], + ) + .unwrap(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; - assert_eq!(df_schema.inner.metadata(), schema.metadata()) + assert_eq!(df_schema.inner.schema.metadata(), schema.metadata()) } #[test] diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index c50ec64759d5..a2a6d444ebb0 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -177,6 +177,11 @@ pub enum SchemaError { field: Box, valid_fields: Vec, }, + /// Schema contains a different number of fields and field qualifiers + UnmatchedFieldQualifiers { + field_count: usize, + qualifier_count: usize, + }, } impl Display for SchemaError { @@ -256,6 +261,16 @@ impl Display for SchemaError { ) } } + Self::UnmatchedFieldQualifiers { + field_count, + qualifier_count, + } => { + write!( + f, + "Schema contains a different number of fields ({}) and field qualifiers ({})", + field_count, qualifier_count + ) + } } } } diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index df1ae100f581..de643281ae9e 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -54,7 +54,8 @@ pub mod utils; pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, + qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldId, QualifiedSchema, + SchemaExt, ToDFSchema, }; pub use diagnostic::Diagnostic; pub use error::{ diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 541e0b6dfa91..68caf76303ff 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -59,6 +59,13 @@ impl TableSource for DefaultTableSource { self.table_provider.schema() } + /// Get a reference to the metadata columns for this table. + /// By default this delegate to the table provider, but can be overridden by the table source. + /// See [`crate::datasource::TableProvider::metadata_columns`] for more information. + fn metadata_columns(&self) -> Option { + self.table_provider.metadata_columns() + } + /// Get a reference to applicable constraints, if any exists. fn constraints(&self) -> Option<&Constraints> { self.table_provider.constraints() diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d73b7d81536a..1efe571d3209 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -66,7 +66,7 @@ use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, - ScalarValue, + FieldId, ScalarValue, }; use datafusion_expr::dml::{CopyTo, InsertOp}; use datafusion_expr::expr::{ @@ -1980,7 +1980,12 @@ impl DefaultPhysicalPlanner { match input_schema.index_of_column(col) { Ok(idx) => { // index physical field using logical field index - Ok(input_exec.schema().field(idx).name().to_string()) + match FieldId::from(idx) { + FieldId::Normal(idx) => { + Ok(input_exec.schema().field(idx).name().to_string()) + } + FieldId::Metadata(_) => Ok(String::from(col.name())), + } } // logical column is not a derived column, safe to pass along to // physical_name @@ -2356,14 +2361,15 @@ mod tests { let expected_error: &str = "Error during planning: \ Extension planner for NoOp created an ExecutionPlan with mismatched schema. \ LogicalPlan schema: \ - DFSchema { inner: Schema { fields: \ + DFSchema { inner: QualifiedSchema { schema: Schema { fields: \ [Field { name: \"a\", \ data_type: Int32, \ nullable: false, \ dict_id: 0, \ dict_is_ordered: false, metadata: {} }], \ - metadata: {} }, field_qualifiers: [None], \ - functional_dependencies: FunctionalDependencies { deps: [] } }, \ + metadata: {} }, \ + field_qualifiers: [None] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, metadata: None }, \ ExecutionPlan schema: Schema { fields: \ [Field { name: \"b\", \ data_type: Int32, \ diff --git a/datafusion/core/tests/sql/metadata_columns.rs b/datafusion/core/tests/sql/metadata_columns.rs new file mode 100644 index 000000000000..c39a2e5f81e8 --- /dev/null +++ b/datafusion/core/tests/sql/metadata_columns.rs @@ -0,0 +1,833 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use arrow::array::{ArrayRef, StringArray, UInt64Array}; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::{MemTable, TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, +}; +use datafusion::{assert_batches_sorted_eq, prelude::*}; + +use datafusion::catalog::Session; +use datafusion_common::{record_batch, FieldId}; +use itertools::Itertools; + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone)] +pub struct CustomDataSource { + test_conflict_name: bool, + inner: Arc>, + metadata_columns: SchemaRef, +} + +struct CustomDataSourceInner { + data: Vec, +} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + ) -> Result> { + Ok(Arc::new(CustomExec::new( + self.test_conflict_name, + projections, + self.clone(), + ))) + } + + pub(crate) fn populate_users(&self) { + self.add_user(User { + id: 1, + bank_account: 9_000, + }); + self.add_user(User { + id: 2, + bank_account: 100, + }); + self.add_user(User { + id: 3, + bank_account: 1_000, + }); + } + + fn add_user(&self, user: User) { + let mut inner = self.inner.lock().unwrap(); + inner.data.push(user); + } + + fn with_conflict_name(&self) -> Self { + CustomDataSource { + test_conflict_name: true, + inner: self.inner.clone(), + metadata_columns: self.metadata_columns.clone(), + } + } +} + +impl Default for CustomDataSource { + fn default() -> Self { + CustomDataSource { + test_conflict_name: false, + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + })), + metadata_columns: Arc::new(Schema::new(vec![ + Field::new("_rowid", DataType::UInt64, false), + Field::new("_file", DataType::Utf8, false), + ])), + } + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + if self.test_conflict_name { + SchemaRef::new(Schema::new(vec![ + Field::new("_file", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } else { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + } + + fn metadata_columns(&self) -> Option { + Some(self.metadata_columns.clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + return self.create_physical_plan(projection).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + test_conflict_name: bool, + db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn new( + test_conflict_name: bool, + projections: Option<&Vec>, + db: CustomDataSource, + ) -> Self { + let schema = db.schema(); + let metadata_schema = db.metadata_columns(); + let projected_schema = match projections { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| match FieldId::from(*idx) { + FieldId::Normal(i) => Arc::new(schema.field(i).clone()), + FieldId::Metadata(i) => { + Arc::new(metadata_schema.as_ref().unwrap().field(i).clone()) + } + }) + .collect_vec(); + Arc::new(Schema::new(projection)) + } + None => schema, + }; + let cache = Self::compute_properties(projected_schema.clone()); + Self { + test_conflict_name, + db, + projected_schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let users: Vec = { + let db = self.db.inner.lock().unwrap(); + db.data.clone() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + let len = users.len() as u64; + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + let id_array = id_array.finish(); + let account_array = account_array.finish(); + let rowid_array = UInt64Array::from_iter_values(0_u64..len); + let file_array = + StringArray::from_iter_values((0_u64..len).map(|i| format!("file-{}", i))); + + let arrays = self + .projected_schema + .fields + .iter() + .map(|f| match f.name().as_str() { + "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, + "id" => Arc::new(id_array.clone()) as ArrayRef, + "bank_account" => Arc::new(account_array.clone()) as ArrayRef, + "_file" => { + if self.test_conflict_name { + Arc::new(id_array.clone()) as ArrayRef + } else { + Arc::new(file_array.clone()) as ArrayRef + } + } + _ => panic!("cannot reach here"), + }) + .collect(); + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new(self.projected_schema.clone(), arrays)?], + self.schema(), + None, + )?)) + } +} + +#[derive(Debug)] +struct MetadataColumnTableProvider { + inner: MemTable, + schema: SchemaRef, + metadata_schema: Option, + schema_indices: Vec, + metadata_indices: Vec, +} + +impl MetadataColumnTableProvider { + fn get_schema( + batch_schema: &SchemaRef, + system_column: bool, + ) -> (Option, Vec) { + let columns = batch_schema + .fields() + .iter() + .enumerate() + .filter(|(_, f)| { + if let Some(v) = f.metadata().get("datafusion.system_column") { + system_column ^ (!v.to_lowercase().starts_with("t")) + } else { + system_column ^ true + } + }) + .collect::>(); + if columns.is_empty() { + (None, vec![]) + } else { + ( + Some(Arc::new(Schema::new( + columns + .iter() + .map(|(_, f)| f) + .cloned() + .cloned() + .collect::>(), + ))), + columns.iter().map(|(idx, _)| *idx).collect::>(), + ) + } + } + fn new(batch: RecordBatch) -> Self { + let batch_schema = batch.schema(); + let (schema, schema_indices) = Self::get_schema(&batch_schema, false); + let schema = schema.unwrap(); + let (metadata_schema, metadata_indices) = Self::get_schema(&batch_schema, true); + let inner = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap(); + Self { + inner, + schema, + metadata_schema, + schema_indices, + metadata_indices, + } + } +} + +#[async_trait::async_trait] +impl TableProvider for MetadataColumnTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn metadata_columns(&self) -> Option { + self.metadata_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let indices = match projection { + Some(projection) => projection + .iter() + .map(|idx| match FieldId::from(*idx) { + FieldId::Normal(i) => self.schema_indices[i], + FieldId::Metadata(i) => self.metadata_indices[i], + }) + .collect::>(), + None => self.schema_indices.clone(), + }; + self.inner.scan(state, Some(&indices), filters, limit).await + } +} + +#[tokio::test] +async fn select_conflict_name() { + // when reading csv, json or parquet, normal column name may be same as metadata column name, + // metadata column name should be suppressed. + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default().with_conflict_name(); + db.populate_users(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + + let batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+--------------+-----------+", + "| column_name | data_type |", + "+--------------+-----------+", + "| _file | UInt8 |", + "| bank_account | UInt64 |", + "+--------------+-----------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select0 = "SELECT _file FROM test"; + let df = ctx.sql_with_options(select0, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+-------+", + "| _file |", + "+-------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); +} + +#[tokio::test] +async fn select_metadata_column() { + // Verify SessionContext::with_sql_options errors appropriately + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default(); + db.populate_users(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + let batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+--------------+-----------+", + "| column_name | data_type |", + "+--------------+-----------+", + "| id | UInt8 |", + "| bank_account | UInt64 |", + "+--------------+-----------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select0 = "SELECT * FROM test order by id"; + let df = ctx.sql_with_options(select0, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 100 |", + "| 3 | 1000 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select1 = "SELECT _rowid FROM test order by _rowid"; + let df = ctx.sql_with_options(select1, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select2 = "SELECT _rowid, id FROM test order by _rowid"; + let df = ctx.sql_with_options(select2, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df = ctx.sql_with_options(select3, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select4 = "SELECT _rowid FROM test LIMIT 1"; + let df = ctx.sql_with_options(select4, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df = ctx.sql_with_options(select5, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 1 | 2 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select6 = "SELECT _rowid, _file FROM test order by _rowid"; + let df = ctx.sql_with_options(select6, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+--------+", + "| _rowid | _file |", + "+--------+--------+", + "| 0 | file-0 |", + "| 1 | file-1 |", + "| 2 | file-2 |", + "+--------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let batch = record_batch!( + ("other_id", UInt8, vec![1, 2, 3]), + ("bank_account", UInt64, vec![9, 10, 11]), + ("_rowid", UInt32, vec![10, 11, 12]) // not a system column! + ) + .unwrap(); + let _ = ctx + .register_table("test2", Arc::new(MetadataColumnTableProvider::new(batch))) + .unwrap(); + + // Normally _rowid would be a name conflict and throw an error during planning. + // But when it's a conflict between a system column and a non system column, + // the non system column should be used. + let select7 = + "SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select7).await.unwrap(); + let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 10 |", + "| 2 | 2 | 11 |", + "| 3 | 3 | 12 |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + // Sanity check: for other columns we do get a conflict + let select7 = + "SELECT id, other_id, bank_account FROM test INNER JOIN test2 ON id = other_id"; + assert!(ctx.sql(select7).await.is_err()); + + // Demonstrate that we can join on _rowid + let batch = record_batch!( + ("other_id", UInt8, vec![2, 3, 4]), + ("_rowid", UInt32, vec![2, 3, 4]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("other_id", DataType::UInt8, true), + Field::new("_rowid", DataType::UInt32, true).with_metadata( + [("datafusion.system_column".to_string(), "true".to_string())] + .iter() + .cloned() + .collect(), + ), + ]))) + .unwrap(); + let _ = ctx + .register_table("test3", Arc::new(MetadataColumnTableProvider::new(batch))) + .unwrap(); + + let select8 = "SELECT id, other_id, test._rowid FROM test JOIN test3 ON test._rowid = test3._rowid"; + let df = ctx.sql(select8).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 3 | 2 | 2 |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Once passed through a projection, system columns are no longer available + let select9 = r" + WITH cte AS (SELECT * FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select9).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 100 |", + "| 3 | 1000 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + let select10 = r" + WITH cte AS (SELECT * FROM test) + SELECT _rowid FROM cte + "; + let df = ctx.sql(select10).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + let select11 = r" + WITH cte AS (SELECT id FROM test) + SELECT _rowid, id FROM cte + "; + let df = ctx.sql(select11).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + let select12 = r" + WITH cte AS (SELECT id FROM test) + SELECT id, _rowid FROM cte + "; + let df = ctx.sql(select12).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // And if passed explicitly selected and passed through a projection + // they are no longer system columns. + let select13 = r" + WITH cte AS (SELECT id, _rowid FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select13).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // test dataframe api + let tb = ctx.table("test").await.unwrap(); + let df = tb + .select(vec![col("_rowid")]) + .unwrap() + .sort_by(vec![col("_rowid")]) + .unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + // propagate metadata columns through Project + let tb = ctx.table("test").await.unwrap(); + let df = tb + .select(vec![col("id")]) + .unwrap() + .select(vec![col("_rowid")]) + .unwrap() + .sort_by(vec![col("_rowid")]) + .unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + // propagate metadata columns through Filter + let select14 = "select _rowid, id from test where id = 2"; + let tb = ctx.table("test").await.unwrap(); + let df = tb + .filter(col("id").eq(lit(2))) + .unwrap() + .select(vec![col("_rowid"), col("id")]) + .unwrap(); + let df2 = ctx.sql(select14).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let batchs2 = df2.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 1 | 2 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + assert_batches_sorted_eq!(expected, &batchs2); + + // propagate metadata columns through Sort + let select15 = "select _rowid, id from test order by id"; + let tb = ctx.table("test").await.unwrap(); + let df = tb + .sort_by(vec![col("id")]) + .unwrap() + .select(vec![col("_rowid"), col("id")]) + .unwrap(); + let df2 = ctx.sql(select15).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let batchs2 = df2.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + assert_batches_sorted_eq!(expected, &batchs2); + + // propagate metadata columns through SubqueryAlias if child is leaf node + let tb = ctx.table("test").await.unwrap(); + let select16 = "SELECT _rowid FROM test sbq order by id"; + let df = tb + .alias("sbq") + .unwrap() + .select(vec![col("_rowid")]) + .unwrap() + .sort_by(vec![col("id")]) + .unwrap(); + let df2 = ctx.sql_with_options(select16, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let batchs2 = df2.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + assert_batches_sorted_eq!(expected, &batchs2); +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 03c4ad7c013e..c4ec9c516d1e 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -61,6 +61,7 @@ pub mod aggregates; pub mod create_drop; pub mod explain_analyze; pub mod joins; +mod metadata_columns; mod path_partition; pub mod select; mod sql_api; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index da30f2d7a712..0007c799b6a6 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1512,7 +1512,12 @@ pub fn build_join_schema( .into_iter() .chain(right.metadata().clone()) .collect(); - let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; + let mut dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; + dfschema = dfschema.with_metadata_schema(DFSchema::join_metadata_schema( + left.metadata_schema(), + right.metadata_schema(), + join_type, + )?); dfschema.with_functional_dependencies(func_dependencies) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 870b0751c923..391ecdba665b 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -55,9 +55,9 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, - DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, - FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference, - UnnestOptions, + DFSchema, DFSchemaRef, DataFusionError, Dependency, FieldId, FunctionalDependence, + FunctionalDependencies, ParamValues, QualifiedSchema, Result, ScalarValue, + TableReference, UnnestOptions, }; use indexmap::IndexSet; @@ -2184,7 +2184,8 @@ pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result, _> = p + .iter() + .map(|i| match FieldId::from(*i) { + FieldId::Metadata(i) => { + if let Some(metadata) = &metadata { + Ok(( + Some(table_name.clone()), + Arc::new(metadata.field(i).clone()), + )) + } else { + plan_err!("table doesn't support metadata column") + } + } + FieldId::Normal(i) => Ok(( + Some(table_name.clone()), + Arc::new(schema.field(i).clone()), + )), + }) + .collect(); let df_schema = DFSchema::new_with_metadata( - p.iter() - .map(|i| { - (Some(table_name.clone()), Arc::new(schema.field(*i).clone())) - }) - .collect(), + qualified_fields?, schema.metadata.clone(), )?; df_schema.with_functional_dependencies(projected_func_dependencies) @@ -2620,6 +2642,11 @@ impl TableScan { DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?; df_schema.with_functional_dependencies(func_dependencies) })?; + if let Some(metadata) = metadata { + projected_schema = projected_schema.with_metadata_schema(Some( + QualifiedSchema::new_with_table(metadata, &table_name), + )); + } let projected_schema = Arc::new(projected_schema); Ok(Self { diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index d6155cfb5dc0..ea3aa8801798 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -97,6 +97,11 @@ pub trait TableSource: Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Get metadata columns of this table. + fn metadata_columns(&self) -> Option { + None + } + /// Get primary key indices, if any fn constraints(&self) -> Option<&Constraints> { None diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 0b4080abcb55..65ad27e05813 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -101,6 +101,7 @@ use datafusion::error::Result; pub struct FFI_TableProvider { /// Return the table schema pub schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema, + pub metadata_columns: unsafe extern "C" fn(provider: &Self) -> ROption, /// Perform a scan on the table. See [`TableProvider`] for detailed usage information. /// @@ -173,6 +174,15 @@ unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedS provider.schema().into() } +unsafe extern "C" fn metadata_columns_fn_wrapper( + provider: &FFI_TableProvider, +) -> ROption { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; + + provider.metadata_columns().map(|s| s.into()).into() +} + unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { @@ -334,6 +344,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table FFI_TableProvider { schema: schema_fn_wrapper, + metadata_columns: metadata_columns_fn_wrapper, scan: scan_fn_wrapper, table_type: table_type_fn_wrapper, supports_filters_pushdown: provider.supports_filters_pushdown, @@ -362,6 +373,7 @@ impl FFI_TableProvider { Self { schema: schema_fn_wrapper, + metadata_columns: metadata_columns_fn_wrapper, scan: scan_fn_wrapper, table_type: table_type_fn_wrapper, supports_filters_pushdown: match can_support_pushdown_filters { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index b7dd391586a1..3febd85e7c6e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use datafusion_common::{ get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column, - HashMap, JoinType, Result, + FieldId, HashMap, JoinType, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::Unnest; @@ -251,11 +251,15 @@ fn optimize_projections( fetch, projected_schema: _, } = table_scan; - // Get indices referred to in the original (schema with all fields) // given projected indices. let projection = match &projection { - Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), + Some(projection) => { + indices.into_mapped_indices(|idx| match FieldId::from(idx) { + FieldId::Normal(idx) => projection[idx], + FieldId::Metadata(_) => idx, + }) + } None => indices.into_inner(), }; return TableScan::try_new( @@ -354,8 +358,16 @@ fn optimize_projections( } LogicalPlan::Join(join) => { let left_len = join.left.schema().fields().len(); - let (left_req_indices, right_req_indices) = - split_join_requirements(left_len, indices, &join.join_type); + let left_metadata_column_len = match join.left.schema().metadata_schema() { + Some(schema) => schema.len(), + None => 0, + }; + let (left_req_indices, right_req_indices) = split_join_requirements( + left_len, + left_metadata_column_len, + indices, + &join.join_type, + ); let left_indices = left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = @@ -582,7 +594,13 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result> { // * the current column is an expression "f" // // return the expression `d + e` (not `d + e` as f) - let input_expr = input.expr[idx].clone().unalias_nested().data; + let input_expr = match FieldId::from(idx) { + FieldId::Metadata(_) => { + let (relation, field) = input.schema.qualified_field(idx); + Expr::Column(Column::new(relation.cloned(), field.name().clone())) + } + FieldId::Normal(idx) => input.expr[idx].clone().unalias_nested().data, + }; Ok(Transformed::yes(input_expr)) } // Unsupported type for consecutive projection merge analysis. @@ -672,6 +690,7 @@ fn outer_columns_helper_multi<'a, 'b>( /// adjusted based on the join type. fn split_join_requirements( left_len: usize, + left_metadata_column_len: usize, indices: RequiredIndices, join_type: &JoinType, ) -> (RequiredIndices, RequiredIndices) { @@ -684,7 +703,7 @@ fn split_join_requirements( | JoinType::LeftMark => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: - indices.split_off(left_len) + indices.split_off_with_metadata(left_len, left_metadata_column_len) } // All requirements can be re-routed to left child directly. JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()), @@ -747,9 +766,23 @@ fn rewrite_projection_given_requirements( config: &dyn OptimizerConfig, indices: &RequiredIndices, ) -> Result> { - let Projection { expr, input, .. } = proj; - - let exprs_used = indices.get_at_indices(&expr); + let Projection { + expr, + input, + schema, + .. + } = proj; + let exprs_used = indices + .indices() + .iter() + .map(|&idx| match FieldId::from(idx) { + FieldId::Metadata(_) => { + let (relation, field) = schema.qualified_field(idx); + Expr::Column(Column::new(relation.cloned(), field.name().clone())) + } + FieldId::Normal(idx) => expr[idx].clone(), + }) + .collect::>(); let required_indices = RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f..1f92f3c651e7 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -19,7 +19,7 @@ use crate::optimize_projections::outer_columns; use datafusion_common::tree_node::TreeNodeRecursion; -use datafusion_common::{Column, DFSchemaRef, Result}; +use datafusion_common::{Column, DFSchemaRef, FieldId, Result}; use datafusion_expr::{Expr, LogicalPlan}; /// Represents columns in a schema which are required (used) by a plan node @@ -150,7 +150,6 @@ impl RequiredIndices { self.indices.extend_from_slice(indices); self.compact() } - /// Splits this instance into a tuple with two instances: /// * The first `n` indices /// * The remaining indices, adjusted down by n @@ -159,6 +158,24 @@ impl RequiredIndices { (l, r.map_indices(|idx| idx - n)) } + /// Splits this instance into a tuple with two instances: + /// * The first `n` normal indices + /// * The second `metadata_n` metadata indices + /// * The remaining indices + pub fn split_off_with_metadata(self, n: usize, metadata_n: usize) -> (Self, Self) { + let (l, r) = self.partition(|idx| match FieldId::from(idx) { + FieldId::Normal(idx) => idx < n, + FieldId::Metadata(idx) => idx < metadata_n, + }); + ( + l, + r.map_indices(|idx| match FieldId::from(idx) { + FieldId::Normal(idx) => idx - n, + FieldId::Metadata(idx) => idx - metadata_n, + }), + ) + } + /// Partitions the indices in this instance into two groups based on the /// given predicate function `f`. fn partition(&self, f: F) -> (Self, Self) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49bce3c1ce82..bfaf84d272b6 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -550,21 +550,22 @@ mod tests { Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\ caused by\n\ Internal error: Failed due to a difference in schemas, \ - original schema: DFSchema { inner: Schema { \ + original schema: DFSchema { inner: QualifiedSchema { schema: Schema { \ fields: [], \ metadata: {} }, \ - field_qualifiers: [], \ - functional_dependencies: FunctionalDependencies { deps: [] } \ + field_qualifiers: [] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, \ + metadata: None \ }, \ - new schema: DFSchema { inner: Schema { \ + new schema: DFSchema { inner: QualifiedSchema { schema: Schema { \ fields: [\ Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\ ], \ metadata: {} }, \ - field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \ - functional_dependencies: FunctionalDependencies { deps: [] } }", + field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, metadata: None }", )); } diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs index 88665401dc31..340430633369 100644 --- a/datafusion/sql/src/relation/join.rs +++ b/datafusion/sql/src/relation/join.rs @@ -118,7 +118,8 @@ impl SqlToRel<'_, S> { ) -> Result { match constraint { JoinConstraint::On(sql_expr) => { - let join_schema = left.schema().join(right.schema())?; + let join_schema = + left.schema().join_with_type(right.schema(), &join_type)?; // parse ON expression let expr = self.sql_to_expr(sql_expr, &join_schema, planner_context)?; LogicalPlanBuilder::from(left)