Skip to content

Latest commit

 

History

History
643 lines (465 loc) · 20.7 KB

unity-catalog-spark.md

File metadata and controls

643 lines (465 loc) · 20.7 KB

Unity Catalog Apache Spark™ Integration

This page explains how to create Unity Catalog tables with Apache Spark™.

Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

Integrating Apache Spark with Unity Catalog offers significant advantages over traditional catalog solutions. Unity Catalog provides unified governance across both data and AI assets, fine-grained access control down to the column level, automated data lineage tracking, and seamless interoperability with various lakehouse formats and compute engines. It enables centralized metadata management, simplified data discovery, and enhanced security. The credential vending capability of Unity Catalog is particularly noteworthy as it allows Apache Spark to securely access data stored in Unity Catalog through a controlled mechanism.

  • Neatly organizing data in tables and volumes in the Unity Catalog hierarchy makes it a lot easier to write Spark code.
  • Make it easier to decouple business logic from file paths.
  • Provides easy access to different file formats without end users needing to know how the data is stored.

!!! warning "Prerequisites" For Apache Spark and Delta Lake to work together with Unity Catalog, you will need atleast Apache Spark 3.5.3 and Delta Lake 3.2.1.

Download and Configure Unity Catalog for Apache Spark

The following steps are required to download and configure Unity Catalog for Apache Spark.

Download Apache Spark

Download the latest version of Apache Spark >= 3.5.3 or using the following command.

curl -O https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xzf spark-3.5.3-bin-hadoop3.tgz

[Optional] Configure server properties for cloud storage

To have Unity Catalog work with cloud object storage as the storage location for tables, configure the etc/conf/server.properties to add configuration:

=== "AWS S3"

```bash
## S3 Storage Config (Multiple configs can be added by incrementing the index)
s3.bucketPath.0=<S3_BUCKET>
s3.region.0=<S3_REGION>
s3.awsRoleArn.0=<S3_ROLE>
# Optional (If blank, it will use DefaultCredentialsProviderChain)
s3.accessKey.0=<SECRET>
s3.secretKey.0=<SECRET>
```

=== "Azure ADLSgen2"

```bash
## ADLS Storage Config (Multiple configs can be added by incrementing the index)
adls.storageAccountName.0=<ADLS_STORAGEACCOUNTNAME>
adls.tenantId.0=<ADLS_TENANTID>
adls.clientId.0=<ADLS_CLIENTID>
adls.clientSecret.0=<SECRET>
```

=== "Google Cloud Storage"

```bash
## GCS Storage Config (Multiple configs can be added by incrementing the index)
gcs.bucketPath.0=<GCS_BUCKET>
# Optional (If blank, it will use Default Application chain to find credentials)
gcs.jsonKeyFilePath.0=/path/to/<SECRET>/gcp-key-uc-testing.json
```

[Optional] Restart Unity Catalog Server

If the UC Server is already started, please restart it to account for the cloud storage server properties.

cd unitycatalog/
bin/start-uc-server

Working with Unity Catalog Tables with Apache Spark and Delta Lake Locally

Let’s start running some Spark SQL queries in the Spark SQL shell (bin/spark-sql) or PySpark shell (bin/pyspark) within the terminal of your Apache Spark 3.5.3 folder against your local UC.

=== "Spark SQL"

```bash
bin/spark-sql --name "local-uc-test" \
    --master "local[*]" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
```

=== "PySpark"

```bash
bin/pyspark --name "local-uc-test" \
    --master "local[*]" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
```

!!! tip "Tip" Initially, this may take a few minutes to run to download the necessary dependencies. Afterwards, you can run some quick commands to see your UC assets within Spark SQL shell.

Notice the following packages (--packages) and configurations (--conf)

  • --packages points to the delta-spark and unitycatalog-spark packages; update the version numbers to your current versions.
  • spark.sql.catalog.unity.uri points to your local development UC instance
  • spark.sql.catalog.unity.token is empty indicating there is no authentication; refer to auth for more information.
  • spark.sql.defaultCatalog=unity must be filled out to indicate the default catalog.

??? note "Three-part and two-part naming conventions"

![](https://cdn.prod.website-files.com/66954b344e907bd91f1c8027/66e2bafe16edde34db6395f2_AD_4nXdgqGKSeR2abf7zutk0fiALAs6vejg6EgUDgD_Ud9Xjy7nNkapMePCNH0zJw9Wv0uh6LYn7vlGYrRn4H74G9d0CouV0PWKsUTGkjfBKM5y4Br64B2P5Eapv97bCw0swV4pddsemaWU2zyYYlkKT6Ymxu2YO.png)

As noted in [Unity Catalog 101](https://www.unitycatalog.io/blogs/unity-catalog-oss), UC has a three-part naming convention of [`catalog`].[`schema`].[`asset`].  In the following examples, you can use the three-part notation such as `SELECT * FROM unity.default.marksheet;` or the two-part notation `SELECT * FROM default.marksheet;` as the `defaultCatalog` is already configured.

[Optional] Running Spark SQL for Cloud Object Stores

If you would like to run this against cloud object storage, the following versions of the bin/spark-sql shell command.

=== "AWS S3"

```bash
bin/spark-sql --name "s3-uc-test" \
    --master "local[*]" \
    --packages "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
```

=== "Azure ADLSgen2"

```bash
bin/spark-sql --name "azure-uc-test" \
    --master "local[*]" \
    --packages "org.apache.hadoop:hadoop-azure:3.3.6,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
```

=== "Google Cloud Storage"

```bash
bin/spark-sql --name "gcs-uc-test" \
    --master "local[*]" \
    --jars "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/3.0.2/gcs-connector-3.0.2-shaded.jar" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
    --conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
```

Using Spark SQL to query Unity Catalog schemas and tables

Let’s start by running some quick commands from the Spark SQL and pyspark shells.

The following SHOW SCHEMA shows the default schema that is included in the initial UC configuration.

=== "Spark SQL"

```sql
-- Show schemas (output = default)
SHOW SCHEMAS;

-- Show tables
SHOW TABLES IN default;
```

=== "PySpark"

```python
# Show schemas (output = default)
spark.sql("SHOW SCHEMAS").show()

# Show tables
spark.sql("SHOW TABLES IN default").show()
```

with the output similar to:

+---------+-----------------+-----------+
|namespace|        tableName|isTemporary|
+---------+-----------------+-----------+
|  default|        marksheet|      false|
|  default|marksheet_uniform|      false|
|  default|          numbers|      false|
|  default|   user_countries|      false|
+---------+-----------------+-----------+

Let’s query the first five rows of the marksheet table.

=== "Spark SQL"

```sql
SELECT * FROM default.marksheet LIMIT 5;
```

=== "PySpark"

```python
spark.sql("SELECT * FROM default.marksheet LIMIT 5;").show()
```

With the output looking similar to the following.

+---+----------+-----+
| id|      name|marks|
+---+----------+-----+
|  1|nWYHawtqUw|  930|
|  2|uvOzzthsLV|  166|
|  3|WIAehuXWkv|  170|
|  4|wYCSvnJKTo|  709|
|  5|VsslXsUIDZ|  993|
+---+----------+-----+

Running CRUD Operations on a Unity Catalog Table

Let’s extend this example by executing various CRUD operations on our UC tables.

Create New Schema

=== "Spark SQL"

```sql
-- Create new schema
CREATE SCHEMA demo;

-- Should now show two schemas: default and demo
SHOW SCHEMAS;
```

=== "PySpark"

```python
# Create new schema
spark.sql("CREATE SCHEMA demo")

# Should now show two schemas: default and demo
spark.sql("SHOW SCHEMAS").show()
```

Create New Table

=== "Spark SQL"

```sql
-- Create a new table
CREATE TABLE demo.mytable (id INT, desc STRING) 
USING delta 
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/mytable';
```

=== "PySpark"

```python
# Create a new table
spark.sql("""
CREATE TABLE demo.mytable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>'
""")
# Example location:
# LOCATION '/tmp/tables/mytable'
```

Insert New Rows into Table

=== "Spark SQL"

```sql
-- Insert new rows
INSERT INTO demo.mytable VALUES (1, "test 1");
INSERT INTO demo.mytable VALUES (2, "test 2");
INSERT INTO demo.mytable VALUES (3, "test 3");
INSERT INTO demo.mytable VALUES (4, "test 4");

-- Read table
SELECT * FROM demo.mytable;
```

=== "PySpark"

```python
# Insert new rows
spark.sql("INSERT INTO demo.mytable VALUES (1, 'test 1')")
spark.sql("INSERT INTO demo.mytable VALUES (2, 'test 2')")
spark.sql("INSERT INTO demo.mytable VALUES (3, 'test 3')")
spark.sql("INSERT INTO demo.mytable VALUES (4, 'test 4')")

# Read table
spark.sql("SELECT * FROM demo.mytable").show()
```

Update Row in Table

=== "Spark SQL"

```sql
-- Update row in table
UPDATE demo.mytable SET id = 5 WHERE id = 4;
```

=== "PySpark"

```python
# Update row in table
spark.sql("UPDATE demo.mytable SET id = 5 WHERE id = 4")
```

Delete Row from Table

=== "Spark SQL"

```sql
-- Delete rows
DELETE FROM demo.mytable WHERE id = 5;
```

=== "PySpark"

```python
# Delete rows
spark.sql("DELETE FROM demo.mytable WHERE id = 5")
```

Merge mytable with srctable

Create Secondary Table

=== "Spark SQL"

```sql
-- Create secondary table (we will use this as the source for merge)
CREATE TABLE demo.srctable (id INT, desc STRING) 
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/srctable';

-- Insert new rows
INSERT INTO demo.srctable VALUES (3, "updated");
INSERT INTO demo.srctable VALUES (4, "inserted");
```

=== "PySpark"

```python
# Create secondary table (we will use this as the source for merge)
spark.sql("""
CREATE TABLE demo.srctable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>'
""")
# Example location:
# LOCATION '/tmp/tables/srctable';

# Insert new rows
spark.sql("INSERT INTO demo.srctable VALUES (3, 'updated')")
spark.sql("INSERT INTO demo.srctable VALUES (4, 'inserted')")
```

Merge Command

=== "Spark SQL"

```sql
-- Merge
MERGE INTO demo.mytable as target
USING demo.srctable as source
    ON target.id = source.id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
;

-- Check results
SELECT * FROM demo.mytable;
```

=== "PySpark"

```python
# Merge
spark.sql("""
MERGE INTO demo.mytable AS target
USING demo.srctable AS source
    ON target.id = source.id
WHEN MATCHED THEN 
    UPDATE SET *
WHEN NOT MATCHED THEN 
    INSERT *
""")

# Check results
spark.sql("SELECT * FROM demo.mytable").show()
```
3       updated
4       inserted
1       test 1
2       test 2

Drop Table

=== "Spark SQL"

```sql

-- Drop tables
DROP TABLE demo.srctable;

-- Check results
SHOW TABLES IN default;
```

=== "PySpark"

```python
# Drop tables
spark.sql("DROP TABLE demo.srctable")

# Check results
spark.sql("SHOW TABLES IN default").show()
```

!!! warning Note, this action will only drop the table from UC, it will not remove the data from the file system