This page explains how to create Unity Catalog tables with Apache Spark™.
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
Integrating Apache Spark with Unity Catalog offers significant advantages over traditional catalog solutions. Unity Catalog provides unified governance across both data and AI assets, fine-grained access control down to the column level, automated data lineage tracking, and seamless interoperability with various lakehouse formats and compute engines. It enables centralized metadata management, simplified data discovery, and enhanced security. The credential vending capability of Unity Catalog is particularly noteworthy as it allows Apache Spark to securely access data stored in Unity Catalog through a controlled mechanism.
- Neatly organizing data in tables and volumes in the Unity Catalog hierarchy makes it a lot easier to write Spark code.
- Make it easier to decouple business logic from file paths.
- Provides easy access to different file formats without end users needing to know how the data is stored.
!!! warning "Prerequisites" For Apache Spark and Delta Lake to work together with Unity Catalog, you will need atleast Apache Spark 3.5.3 and Delta Lake 3.2.1.
The following steps are required to download and configure Unity Catalog for Apache Spark.
Download the latest version of Apache Spark >= 3.5.3 or using the following command.
curl -O https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xzf spark-3.5.3-bin-hadoop3.tgz
To have Unity Catalog work with cloud object storage as the storage location for tables, configure the etc/conf/server.properties
to add configuration:
=== "AWS S3"
```bash
## S3 Storage Config (Multiple configs can be added by incrementing the index)
s3.bucketPath.0=<S3_BUCKET>
s3.region.0=<S3_REGION>
s3.awsRoleArn.0=<S3_ROLE>
# Optional (If blank, it will use DefaultCredentialsProviderChain)
s3.accessKey.0=<SECRET>
s3.secretKey.0=<SECRET>
```
=== "Azure ADLSgen2"
```bash
## ADLS Storage Config (Multiple configs can be added by incrementing the index)
adls.storageAccountName.0=<ADLS_STORAGEACCOUNTNAME>
adls.tenantId.0=<ADLS_TENANTID>
adls.clientId.0=<ADLS_CLIENTID>
adls.clientSecret.0=<SECRET>
```
=== "Google Cloud Storage"
```bash
## GCS Storage Config (Multiple configs can be added by incrementing the index)
gcs.bucketPath.0=<GCS_BUCKET>
# Optional (If blank, it will use Default Application chain to find credentials)
gcs.jsonKeyFilePath.0=/path/to/<SECRET>/gcp-key-uc-testing.json
```
If the UC Server is already started, please restart it to account for the cloud storage server properties.
cd unitycatalog/
bin/start-uc-server
Let’s start running some Spark SQL queries in the Spark SQL shell (bin/spark-sql
) or PySpark shell (bin/pyspark
) within the terminal of your Apache Spark 3.5.3 folder against your local UC.
=== "Spark SQL"
```bash
bin/spark-sql --name "local-uc-test" \
--master "local[*]" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
```
=== "PySpark"
```bash
bin/pyspark --name "local-uc-test" \
--master "local[*]" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
```
!!! tip "Tip" Initially, this may take a few minutes to run to download the necessary dependencies. Afterwards, you can run some quick commands to see your UC assets within Spark SQL shell.
Notice the following packages (--packages
) and configurations (--conf
)
--packages
points to the delta-spark and unitycatalog-spark packages; update the version numbers to your current versions.spark.sql.catalog.unity.uri
points to your local development UC instancespark.sql.catalog.unity.token
is empty indicating there is no authentication; refer to auth for more information.spark.sql.defaultCatalog=unity
must be filled out to indicate the default catalog.
??? note "Three-part and two-part naming conventions"

As noted in [Unity Catalog 101](https://www.unitycatalog.io/blogs/unity-catalog-oss), UC has a three-part naming convention of [`catalog`].[`schema`].[`asset`]. In the following examples, you can use the three-part notation such as `SELECT * FROM unity.default.marksheet;` or the two-part notation `SELECT * FROM default.marksheet;` as the `defaultCatalog` is already configured.
If you would like to run this against cloud object storage, the following versions of the bin/spark-sql
shell command.
=== "AWS S3"
```bash
bin/spark-sql --name "s3-uc-test" \
--master "local[*]" \
--packages "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
```
=== "Azure ADLSgen2"
```bash
bin/spark-sql --name "azure-uc-test" \
--master "local[*]" \
--packages "org.apache.hadoop:hadoop-azure:3.3.6,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
```
=== "Google Cloud Storage"
```bash
bin/spark-sql --name "gcs-uc-test" \
--master "local[*]" \
--jars "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/3.0.2/gcs-connector-3.0.2-shaded.jar" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
--conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
```
Let’s start by running some quick commands from the Spark SQL and pyspark shells.
The following SHOW SCHEMA
shows the default
schema that is included in the initial UC configuration.
=== "Spark SQL"
```sql
-- Show schemas (output = default)
SHOW SCHEMAS;
-- Show tables
SHOW TABLES IN default;
```
=== "PySpark"
```python
# Show schemas (output = default)
spark.sql("SHOW SCHEMAS").show()
# Show tables
spark.sql("SHOW TABLES IN default").show()
```
with the output similar to:
+---------+-----------------+-----------+
|namespace| tableName|isTemporary|
+---------+-----------------+-----------+
| default| marksheet| false|
| default|marksheet_uniform| false|
| default| numbers| false|
| default| user_countries| false|
+---------+-----------------+-----------+
Let’s query the first five rows of the marksheet
table.
=== "Spark SQL"
```sql
SELECT * FROM default.marksheet LIMIT 5;
```
=== "PySpark"
```python
spark.sql("SELECT * FROM default.marksheet LIMIT 5;").show()
```
With the output looking similar to the following.
+---+----------+-----+
| id| name|marks|
+---+----------+-----+
| 1|nWYHawtqUw| 930|
| 2|uvOzzthsLV| 166|
| 3|WIAehuXWkv| 170|
| 4|wYCSvnJKTo| 709|
| 5|VsslXsUIDZ| 993|
+---+----------+-----+
Let’s extend this example by executing various CRUD operations on our UC tables.
=== "Spark SQL"
```sql
-- Create new schema
CREATE SCHEMA demo;
-- Should now show two schemas: default and demo
SHOW SCHEMAS;
```
=== "PySpark"
```python
# Create new schema
spark.sql("CREATE SCHEMA demo")
# Should now show two schemas: default and demo
spark.sql("SHOW SCHEMAS").show()
```
=== "Spark SQL"
```sql
-- Create a new table
CREATE TABLE demo.mytable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/mytable';
```
=== "PySpark"
```python
# Create a new table
spark.sql("""
CREATE TABLE demo.mytable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>'
""")
# Example location:
# LOCATION '/tmp/tables/mytable'
```
=== "Spark SQL"
```sql
-- Insert new rows
INSERT INTO demo.mytable VALUES (1, "test 1");
INSERT INTO demo.mytable VALUES (2, "test 2");
INSERT INTO demo.mytable VALUES (3, "test 3");
INSERT INTO demo.mytable VALUES (4, "test 4");
-- Read table
SELECT * FROM demo.mytable;
```
=== "PySpark"
```python
# Insert new rows
spark.sql("INSERT INTO demo.mytable VALUES (1, 'test 1')")
spark.sql("INSERT INTO demo.mytable VALUES (2, 'test 2')")
spark.sql("INSERT INTO demo.mytable VALUES (3, 'test 3')")
spark.sql("INSERT INTO demo.mytable VALUES (4, 'test 4')")
# Read table
spark.sql("SELECT * FROM demo.mytable").show()
```
=== "Spark SQL"
```sql
-- Update row in table
UPDATE demo.mytable SET id = 5 WHERE id = 4;
```
=== "PySpark"
```python
# Update row in table
spark.sql("UPDATE demo.mytable SET id = 5 WHERE id = 4")
```
=== "Spark SQL"
```sql
-- Delete rows
DELETE FROM demo.mytable WHERE id = 5;
```
=== "PySpark"
```python
# Delete rows
spark.sql("DELETE FROM demo.mytable WHERE id = 5")
```
Create Secondary Table
=== "Spark SQL"
```sql
-- Create secondary table (we will use this as the source for merge)
CREATE TABLE demo.srctable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/srctable';
-- Insert new rows
INSERT INTO demo.srctable VALUES (3, "updated");
INSERT INTO demo.srctable VALUES (4, "inserted");
```
=== "PySpark"
```python
# Create secondary table (we will use this as the source for merge)
spark.sql("""
CREATE TABLE demo.srctable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>'
""")
# Example location:
# LOCATION '/tmp/tables/srctable';
# Insert new rows
spark.sql("INSERT INTO demo.srctable VALUES (3, 'updated')")
spark.sql("INSERT INTO demo.srctable VALUES (4, 'inserted')")
```
Merge Command
=== "Spark SQL"
```sql
-- Merge
MERGE INTO demo.mytable as target
USING demo.srctable as source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
;
-- Check results
SELECT * FROM demo.mytable;
```
=== "PySpark"
```python
# Merge
spark.sql("""
MERGE INTO demo.mytable AS target
USING demo.srctable AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
# Check results
spark.sql("SELECT * FROM demo.mytable").show()
```
3 updated
4 inserted
1 test 1
2 test 2
Drop Table
=== "Spark SQL"
```sql
-- Drop tables
DROP TABLE demo.srctable;
-- Check results
SHOW TABLES IN default;
```
=== "PySpark"
```python
# Drop tables
spark.sql("DROP TABLE demo.srctable")
# Check results
spark.sql("SHOW TABLES IN default").show()
```
!!! warning Note, this action will only drop the table from UC, it will not remove the data from the file system