Skip to content

Commit

Permalink
docs(iceberg): add spark-sql samples (#12565)
Browse files Browse the repository at this point in the history
  • Loading branch information
chakru-r authored Feb 20, 2025
1 parent 5d45358 commit 9e18fa0
Showing 1 changed file with 145 additions and 26 deletions.
171 changes: 145 additions & 26 deletions docs/iceberg-catalog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import FeatureAvailability from '@site/src/components/FeatureAvailability';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

# DataHub Iceberg Catalog

Expand Down Expand Up @@ -44,15 +46,17 @@ Before starting, ensure you have:
DH_ICEBERG_DATA_ROOT="s3://your-bucket/path"

```
4. Configure pyiceberg to use your local datahub using one of its supported ways. For example, create `~/.pyiceberg.yaml` with
4. If using pyiceberg, configure pyiceberg to use your local datahub using one of its supported ways. For example, create `~/.pyiceberg.yaml` with
```commandline
catalog:
local_datahub:
uri: http://localhost:8080/iceberg
warehouse: arctic_warehouse
```

Note: The python code snippets in this tutorial are available in the `metadata-ingestion/examples/iceberg` folder of the DataHub repository. These snippets require `pyiceberg[duckdb] >=0.8.1` to be installed.
Note:
The python code snippets in this tutorial are based on the code available in the `metadata-ingestion/examples/iceberg` folder of the DataHub repository. These snippets require `pyiceberg[duckdb] >=0.8.1` to be installed.
For the spark examples, the tested version of spark is 3.5.3_2.12

### Required AWS Permissions

Expand All @@ -64,9 +68,19 @@ Note: These permissions must be granted for the specific S3 bucket and path pref

### 1. Provision a Warehouse

First, create an Iceberg warehouse in DataHub using the provided script (`provision_warehouse.py`):
Create an Iceberg warehouse in DataHub

<Tabs>
<TabItem value="cli" label="CLI" default>

```
datahub iceberg create -w arctic_warehouse -d $DH_ICEBERG_DATA_ROOT -i $DH_ICEBERG_CLIENT_ID --client_secret $DH_ICEBERG_CLIENT_SECRET --region "us-east-1" --role $DH_ICEBERG_AWS_ROLE
```
</TabItem>
<TabItem value="python" label="Python (pyiceberg)">

```python
# File: provision_warehouse.py
import os

from constants import warehouse
Expand All @@ -92,6 +106,8 @@ os.system(
f"datahub iceberg create --warehouse {warehouse} --data_root $DH_ICEBERG_DATA_ROOT/{warehouse} --client_id $DH_ICEBERG_CLIENT_ID --client_secret $DH_ICEBERG_CLIENT_SECRET --region 'us-east-1' --role $DH_ICEBERG_AWS_ROLE"
)
```
</TabItem>
</Tabs>

After provisioning the warehouse, ensure your DataHub user has the following privileges to the resource type Data Platform Instance, which were introduced with Iceberg support:
- `DATA_MANAGE_VIEWS_PRIVILEGE`
Expand All @@ -105,6 +121,45 @@ You can grant these privileges through the DataHub UI under the Policies section

You can create Iceberg tables using PyIceberg with a defined schema. Here's an example creating a ski resort metrics table:

<Tabs>
<TabItem value="spark" label="spark-sql" default>

Connect to the DataHub Iceberg Catalog using Spark SQL by defining `$GMS_HOST`, `$GMS_PORT`, `$WAREHOUSE` to connect to and `$USER_PAT` - the DataHub Personal Access Token used to connect to the catalog:
When datahub is running locally, set `GMS_HOST` to `localhost` and `GMS_PORT` to `8080`.
For this example, set `WAREHOUSE` to `arctic_warehouse`

```cli
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=rest \
--conf spark.sql.catalog.local.uri=http://$GMS_HOST:$GMS_PORT/iceberg/ \
--conf spark.sql.catalog.local.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.local.token=$USER_PAT \
--conf spark.sql.catalog.local.rest-metrics-reporting-enabled=false \
--conf spark.sql.catalog.local.header.X-Iceberg-Access-Delegation=vended-credentials \
--conf spark.sql.defaultCatalog=local
```

Use the following SQL via spark-sql

```sql
CREATE NAMESPACE alpine_db;
CREATE TABLE alpine_db.ski_resorts (
resort_id BIGINT NOT NULL COMMENT 'Unique identifier for each ski resort',
resort_name STRING NOT NULL COMMENT 'Official name of the ski resort',
daily_snowfall BIGINT COMMENT 'Amount of new snow in inches during the last 24 hours',
conditions STRING COMMENT 'Current snow conditions description',
last_updated TIMESTAMP COMMENT 'Timestamp of when the snow report was last updated'
);
```

</TabItem>
<TabItem value="python" label="Python (pyiceberg)">

```python
from constants import namespace, table_name, warehouse

Expand Down Expand Up @@ -161,14 +216,38 @@ catalog.create_namespace(namespace)
catalog.create_table(f"{namespace}.{table_name}", schema)
```

</TabItem>
</Tabs>

### 3. Write Data

<Tabs>
<TabItem value="spark" label="spark-sql" default>

```sql
INSERT INTO alpine_db.ski_resorts (resort_id, resort_name, daily_snowfall, conditions, last_updated)
VALUES
(1, 'Snowpeak Resort', 12, 'Powder', CURRENT_TIMESTAMP()),
(2, 'Alpine Valley', 8, 'Packed', CURRENT_TIMESTAMP()),
(3, 'Glacier Heights', 15, 'Fresh Powder', CURRENT_TIMESTAMP());
```

</TabItem>
<TabItem value="python" label="Python (pyiceberg)">

You can write data to your Iceberg table using PyArrow. Note the importance of matching the schema exactly:

```python
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog
from datahub.ingestion.graph.client import get_default_graph

import pyarrow as pa
from datetime import datetime

graph = get_default_graph()
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)

# Create PyArrow schema to match Iceberg schema
pa_schema = pa.schema([
("resort_id", pa.int64(), False), # False means not nullable
Expand Down Expand Up @@ -202,17 +281,32 @@ table.overwrite(sample_data)
table.refresh()
```

</TabItem>
</Tabs>

### 4. Read Data

<Tabs>
<TabItem value="spark" label="spark-sql" default>

```sql
SELECT * from alpine_db.resort_metrics;
```

</TabItem>
<TabItem value="python" label="Python (pyiceberg)">

Reading data from an Iceberg table using DuckDB integration:

```python
from pyiceberg.catalog import load_catalog
from constants import namespace, table_name, warehouse

from datahub.ingestion.graph.client import get_default_graph

# Get DataHub graph client for authentication
graph = get_default_graph()


catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)
table = catalog.load_table(f"{namespace}.{table_name}")
con = table.scan().to_duckdb(table_name=table_name)
Expand All @@ -224,24 +318,10 @@ for row in con.execute(f"SELECT * FROM {table_name}").fetchall():
print(row)
```

## Integration with Compute Engines
</TabItem>

### Spark Integration

You can connect to the DataHub Iceberg Catalog using Spark SQL by defining `$GMS_HOST`, `$GMS_PORT`, `$WAREHOUSE` to connect to and `$USER_PAT` - the DataHub Personal Access Token used to connect to the catalog:

```sql
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=rest \
--conf spark.sql.catalog.local.uri=http://$GMS_HOST:$GMS_PORT/iceberg/ \
--conf spark.sql.catalog.local.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.local.token=$USER_PAT \
--conf spark.sql.catalog.local.header.X-Iceberg-Access-Delegation=vended-credentials \
--conf spark.sql.defaultCatalog=local
```
</Tabs>

## Reference Information

Expand Down Expand Up @@ -273,16 +353,54 @@ When setting up your AWS role, you'll need to configure a trust policy. Here's a
--conf spark.sql.catalog.local.default-namespace=<default-namespace>
```

### Public access of Iceberg Tables

It is possible to enable public read-only access of specific Iceberg tables if needed.
Enabling public access requires the following steps.
1. Ensure that the DATA ROOT folder in s3 has public read access policy set to enable the files with that prefix to be read without AWS credentials.
2. Update the GMS Configuration to enable public access, ensure the GMS service is run with the following environment variables set.
- Set the env var `ENABLE_PUBLIC_READ` to `true` to enable the capability. If unset, this is by default `false`.
- Set the env var `PUBLICLY_READABLE_TAG` to a specific Tag name that indicates public access when applied to an Iceberg DataSet. If unset, this defaults to the tag `PUBLICLY_READABLE`

Alternatively, these can be set in metadata-service/configuration/src/main/resources/application.yaml under `icebergCatalog` key. The defaults are populated under that key.
3. Once GMS is started with enabling the public read capability, apply the Tag defined for public access on each Dataset that should be accessible without authentication.

To access these tables that have public access, start the spark-sql with the following settings

```commandline
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1\
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=rest \
--conf spark.sql.catalog.local.uri=http://${GMS_HOST}:${GMS_PORT}/public-iceberg/ \
--conf spark.sql.catalog.local.warehouse=arctic_warehouse \
--conf spark.sql.catalog.local.header.X-Iceberg-Access-Delegation=false \
--conf spark.sql.catalog.local.rest-metrics-reporting-enabled=false \
--conf spark.sql.catalog.local.client.region=us-east-1 \
--conf spark.sql.catalog.local.client.credentials-provider=software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider \
--conf spark.sql.defaultCatalog=local \
--conf spark.sql.catalog.local.default-namespace=alpine_db
```

Note the specific differences from the authenticated access:
- The REST Catalog URI is gms_host:port/<b>public-iceberg/</b>
- The DATA ROOT AWS s3 region is specified via `spark.sql.catalog.local.client.region`
- The `X-Iceberg-Access-Delegation` header is set to <b>`false`</b> instead of `vended-credentials`
- The `credentials-provider` is set to `credentials-provider=software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider` and no Personal Access Token is provided.

In such unauthenticated sessions, attempts to access tables that do not have access will fail with a NoSuchTableException error instead of an authorization failure.

### DataHub Iceberg CLI

The DataHub CLI provides several commands for managing Iceberg warehouses:

1. **List Warehouses**:
1. List Warehouses:
```
datahub iceberg list
```

2. **Update Warehouse Configuration**:
2. Update Warehouse Configuration:
```
datahub iceberg update \
-w $WAREHOUSE_NAME \
Expand All @@ -293,7 +411,7 @@ The DataHub CLI provides several commands for managing Iceberg warehouses:
--role DH_ICEBERG_AWS_ROLE
```

3. **Delete Warehouse**:
3. Delete Warehouse:
```
datahub iceberg delete -w $WAREHOUSE_NAME
```
Expand All @@ -310,9 +428,10 @@ When migrating from another Iceberg catalog, you can register existing Iceberg t

Example of registering an existing table:
```
# REGISTER EXISTING ICEBERG TABLE
call system.register_table('barTable', 's3://my-s3-bucket/my-data-root/fooNs/barTable/metadata/00000-f9dbba67-df4f-4742-9ba5-123aa2bb4076.metadata.json');
select * from barTable;
call system.register_table('myTable', 's3://my-s3-bucket/my-data-root/myNamespace/myTable/metadata/00000-f9dbba67-df4f-4742-9ba5-123aa2bb4076.metadata.json');
-- Read from newly registered table
select * from myTable;
```

### Security and Permissions
Expand Down

0 comments on commit 9e18fa0

Please sign in to comment.