Skip to content

Commit

Permalink
feat(iceberg-rest): implement iceberg REST catalog api (#12500)
Browse files Browse the repository at this point in the history
Co-authored-by: ksrinath <[email protected]>
Co-authored-by: Chakravarthy Racharla <[email protected]>
3 people authored Jan 30, 2025
1 parent ddb3db9 commit f527c5e
Showing 63 changed files with 6,552 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -91,7 +91,8 @@ private SearchUtils() {}
EntityType.DATA_PRODUCT,
EntityType.NOTEBOOK,
EntityType.BUSINESS_ATTRIBUTE,
EntityType.SCHEMA_FIELD);
EntityType.SCHEMA_FIELD,
EntityType.DATA_PLATFORM_INSTANCE);

/** Entities that are part of autocomplete by default in Auto Complete Across Entities */
public static final List<EntityType> AUTO_COMPLETE_ENTITY_TYPES =
3 changes: 3 additions & 0 deletions datahub-web-react/src/graphql/search.graphql
Original file line number Diff line number Diff line change
@@ -482,6 +482,9 @@ fragment searchResultsWithoutSchemaField on Entity {
}
}
}
... on DataPlatformInstance {
...dataPlatformInstanceFields
}
... on Role {
id
properties {
6 changes: 6 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
@@ -210,6 +210,7 @@ A writer can specify that the aspect must NOT have been modified after a specifi
`If-Modified-Since`
A writer can specify that the aspect must have been modified after a specific time, following [If-Modified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since) http headers.


#### Change Types: [`CREATE`, `CREATE_ENTITY`]

Another form of conditional writes which considers the existence of an aspect or entity uses the following Change Types.
@@ -221,3 +222,8 @@ Another form of conditional writes which considers the existence of an aspect or
By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

### Synchronous ElasticSearch Updates

The writes to the elasticsearch are asynchronous by default. A writer can add a custom header
`X-DataHub-Sync-Index-Update` to the MCP `headers` with value set to `true` to enable a synchronous update of
elasticsearch for specific MCPs that may benefit from it.
7 changes: 7 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
@@ -10,6 +10,9 @@ public class Constants {
public static final String INTERNAL_DELEGATED_FOR_ACTOR_HEADER_NAME = "X-DataHub-Delegated-For";
public static final String INTERNAL_DELEGATED_FOR_ACTOR_TYPE = "X-DataHub-Delegated-For-";

// Use on specific MCP to request an synchronous index update avoid the kafka lag.
public static final String SYNC_INDEX_UPDATE_HEADER_NAME = "X-DataHub-Sync-Index-Update";

public static final String URN_LI_PREFIX = "urn:li:";
public static final String DATAHUB_ACTOR = "urn:li:corpuser:datahub"; // Super user.
public static final String SYSTEM_ACTOR =
@@ -103,6 +106,7 @@ public class Constants {
public static final String FORM_ENTITY_NAME = "form";
public static final String RESTRICTED_ENTITY_NAME = "restricted";
public static final String BUSINESS_ATTRIBUTE_ENTITY_NAME = "businessAttribute";
public static final String PLATFORM_RESOURCE_ENTITY_NAME = "platformResource";

/** Aspects */
// Common
@@ -211,6 +215,9 @@ public class Constants {
public static final String DATA_PLATFORM_INSTANCE_PROPERTIES_ASPECT_NAME =
"dataPlatformInstanceProperties";

// PlatformResource
public static final String PLATFORM_RESOURCE_INFO_ASPECT_NAME = "platformResourceInfo";

// ML Feature
public static final String ML_FEATURE_KEY_ASPECT_NAME = "mlFeatureKey";
public static final String ML_FEATURE_PROPERTIES_ASPECT_NAME = "mlFeatureProperties";
Original file line number Diff line number Diff line change
@@ -561,8 +561,7 @@ public static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
return buildDisjunctivePrivilegeGroup(lookupAPIPrivilege(apiGroup, apiOperation, entityType));
}

@VisibleForTesting
static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
public static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
final Disjunctive<Conjunctive<PoliciesConfig.Privilege>> privileges) {
return new DisjunctivePrivilegeGroup(
privileges.stream()
3 changes: 3 additions & 0 deletions metadata-ingestion/examples/iceberg/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
warehouse = "arctic_warehouse"
namespace = "alpine_db"
table_name = "resort_metrics"
108 changes: 108 additions & 0 deletions metadata-ingestion/examples/iceberg/create_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime

import pyarrow as pa
import pyiceberg
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import LongType, NestedField, StringType, TimestampType

from datahub.ingestion.graph.client import get_default_graph

# Define a more comprehensive schema for ski resort data
schema = Schema(
NestedField(
field_id=1,
name="resort_id",
field_type=LongType(),
required=True,
doc="Unique identifier for each ski resort",
initial_default=None,
),
NestedField(
field_id=2,
name="resort_name",
field_type=StringType(),
required=True,
doc="Official name of the ski resort",
initial_default=None,
),
NestedField(
field_id=3,
name="daily_snowfall",
field_type=LongType(),
required=False,
doc="Amount of new snow in inches during the last 24 hours. Null if no measurement available",
initial_default=0,
),
NestedField(
field_id=4,
name="conditions",
field_type=StringType(),
required=False,
doc="Current snow conditions description (e.g., 'Powder', 'Packed Powder', 'Groomed'). Null if not reported",
initial_default=None,
),
NestedField(
field_id=5,
name="last_updated",
field_type=TimestampType(),
required=False,
doc="Timestamp of when the snow report was last updated",
initial_default=None,
),
)

# Load the catalog with new warehouse name
graph = get_default_graph()
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)

# Create namespace (database)
try:
catalog.create_namespace(namespace)
except Exception as e:
print(f"Namespace creation error (might already exist): {e}")

full_table_name = f"{namespace}.{table_name}"
try:
catalog.create_table(full_table_name, schema)
except pyiceberg.exceptions.TableAlreadyExistsError:
print(f"Table {full_table_name} already exists")

# Create sample data with explicit PyArrow schema to match required fields
pa_schema = pa.schema(
[
("resort_id", pa.int64(), False), # False means not nullable
("resort_name", pa.string(), False), # False means not nullable
("daily_snowfall", pa.int64(), True),
("conditions", pa.string(), True),
("last_updated", pa.timestamp("us"), True),
]
)
# Create sample data
sample_data = pa.Table.from_pydict(
{
"resort_id": [1, 2, 3],
"resort_name": ["Snowpeak Resort", "Alpine Valley", "Glacier Heights"],
"daily_snowfall": [12, 8, 15],
"conditions": ["Powder", "Packed", "Fresh Powder"],
"last_updated": [
pa.scalar(datetime.now()),
pa.scalar(datetime.now()),
pa.scalar(datetime.now()),
],
},
schema=pa_schema,
)

# Write data to table
table = catalog.load_table(full_table_name)
table.overwrite(sample_data)

table.refresh()
# Read and verify data
con = table.scan().to_duckdb(table_name=f"{table_name}")
print("\nResort Metrics Data:")
print("-" * 50)
for row in con.execute(f"SELECT * FROM {table_name}").fetchall():
print(row)
10 changes: 10 additions & 0 deletions metadata-ingestion/examples/iceberg/drop_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog

# Load the catalog
from datahub.ingestion.graph.client import get_default_graph

graph = get_default_graph()
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)
# Append the data to the Iceberg table
catalog.drop_table(f"{namespace}.{table_name}")
218 changes: 218 additions & 0 deletions metadata-ingestion/examples/iceberg/folder_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
This script is designed to manage and clean up contents in an S3 bucket, specifically targeting orphaned files and folders.
It provides functionality to list, delete, or simulate deletion of all objects under a specified S3 prefix using AWS assumed role credentials.
The script supports the following operations:
- Listing all files and folders under a specified S3 path.
- Deleting all contents under a specified S3 path.
- Performing a dry run to show what would be deleted without actually deleting the objects.
Environment variables required:
- DH_ICEBERG_AWS_ROLE: The ARN of the AWS role to assume.
- DH_ICEBERG_CLIENT_ID: The AWS client ID.
- DH_ICEBERG_CLIENT_SECRET: The AWS client secret.
Usage:
python folder_operations.py s3://bucket/prefix --list
python folder_operations.py s3://bucket/prefix --nuke
python folder_operations.py s3://bucket/prefix --dry-run
Arguments:
- s3_path: The S3 path to operate on (e.g., s3://bucket/prefix).
- --list: List all folders and files.
- --nuke: Delete all contents.
- --dry-run: Show what would be deleted without actually deleting.
- --region: AWS region (default: us-east-1).
Note: Only one action (--list, --nuke, or --dry-run) can be specified at a time.
"""

import argparse
import os
from datetime import datetime
from typing import Optional, Tuple

import boto3
from mypy_boto3_s3 import S3Client


def get_s3_client_with_role(
client_id: str,
client_secret: str,
role_arn: str,
region: str = "us-east-1",
session_name: str = "IcebergSession",
) -> Tuple[S3Client, datetime]: # type: ignore
"""
Create an S3 client with assumed role credentials.
"""
session = boto3.Session(
aws_access_key_id=client_id,
aws_secret_access_key=client_secret,
region_name=region,
)

sts_client = session.client("sts")

assumed_role_object = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName=session_name
)

credentials = assumed_role_object["Credentials"]

s3_client: S3Client = boto3.client(
"s3",
region_name=region,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)

return s3_client, credentials["Expiration"]


def delete_s3_objects(
s3_client: S3Client, bucket_name: str, prefix: str, dry_run: bool = False
) -> None:
"""
Delete all objects under the specified prefix.
"""
paginator = s3_client.get_paginator("list_objects_v2")

for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
objects_to_delete = []
for obj in page.get("Contents", []):
objects_to_delete.append({"Key": obj["Key"]})
if dry_run:
print(f"Would delete: {obj['Key']}")
print(f" Size: {obj['Size'] / (1024 * 1024):.2f} MB")
print(f" Last Modified: {obj['LastModified']}")

if objects_to_delete and not dry_run:
s3_client.delete_objects(
Bucket=bucket_name,
Delete={"Objects": objects_to_delete}, # type: ignore
)
print(f"Deleted {len(objects_to_delete)} objects")


def list_s3_contents(
s3_path: str,
client_id: str,
client_secret: str,
role_arn: str,
region: str = "us-east-1",
delimiter: Optional[str] = None,
nuke: bool = False,
dry_run: bool = False,
) -> None:
"""
List or delete contents of an S3 path using assumed role credentials.
"""
if not s3_path.startswith("s3://"):
raise ValueError("S3 path must start with 's3://'")

bucket_name = s3_path.split("/")[2]
prefix = "/".join(s3_path.split("/")[3:])
if prefix and not prefix.endswith("/"):
prefix += "/"

s3_client, expiration = get_s3_client_with_role(
client_id=client_id,
client_secret=client_secret,
role_arn=role_arn,
region=region,
)

operation = "Deleting" if nuke else "Would delete" if dry_run else "Listing"
print(f"\n{operation} contents of {s3_path}")
print(f"Using role: {role_arn}")
print(f"Credentials expire at: {expiration}")
print("-" * 60)

if nuke or dry_run:
delete_s3_objects(s3_client, bucket_name, prefix, dry_run)
return

paginator = s3_client.get_paginator("list_objects_v2")

list_params = {"Bucket": bucket_name, "Prefix": prefix}
if delimiter:
list_params["Delimiter"] = delimiter

try:
pages = paginator.paginate(**list_params) # type: ignore
found_contents = False

for page in pages:
if delimiter and "CommonPrefixes" in page:
for common_prefix in page.get("CommonPrefixes", []):
found_contents = True
folder_name = common_prefix["Prefix"][len(prefix) :].rstrip("/")
print(f"📁 {folder_name}/")

for obj in page.get("Contents", []):
found_contents = True
file_path = obj["Key"][len(prefix) :]
if file_path:
size_mb = obj["Size"] / (1024 * 1024)
print(f"📄 {file_path}")
print(f" Size: {size_mb:.2f} MB")
print(f" Last Modified: {obj['LastModified']}")

if not found_contents:
print("No contents found in the specified path.")

except Exception as e:
print(f"Error accessing contents: {str(e)}")


def main():
parser = argparse.ArgumentParser(description="S3 Content Manager")
parser.add_argument(
"s3_path", help="S3 path to operate on (e.g., s3://bucket/prefix)"
)
parser.add_argument(
"--list", action="store_true", help="List all folders and files"
)
parser.add_argument("--nuke", action="store_true", help="Delete all contents")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting",
)
parser.add_argument(
"--region", default="us-east-1", help="AWS region (default: us-east-1)"
)

args = parser.parse_args()

# Get environment variables
role_arn = os.environ.get("DH_ICEBERG_AWS_ROLE")
client_id = os.environ.get("DH_ICEBERG_CLIENT_ID")
client_secret = os.environ.get("DH_ICEBERG_CLIENT_SECRET")

if not all([role_arn, client_id, client_secret]):
raise ValueError(
"Missing required environment variables. Please set DH_ICEBERG_AWS_ROLE, DH_ICEBERG_CLIENT_ID, and DH_ICEBERG_CLIENT_SECRET"
)

# Validate arguments
if sum([args.list, args.nuke, args.dry_run]) != 1:
parser.error("Please specify exactly one action: --list, --nuke, or --dry-run")

list_s3_contents(
args.s3_path,
client_id=client_id, # type: ignore
client_secret=client_secret, # type: ignore
role_arn=role_arn, # type: ignore
region=args.region,
# delimiter='/' if args.list else None,
nuke=args.nuke,
dry_run=args.dry_run,
)


if __name__ == "__main__":
main()
48 changes: 48 additions & 0 deletions metadata-ingestion/examples/iceberg/provision_warehouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
A script to provision a warehouse on DataHub (and Iceberg).
This script uses environment variables to configure the Iceberg client and
provision a warehouse on DataHub. The required environment variables are:
- DH_ICEBERG_CLIENT_ID: The client ID for the Icebreaker service.
- DH_ICEBERG_CLIENT_SECRET: The client secret for the Icebreaker service.
- DH_ICEBERG_AWS_ROLE: The test role for the Icebreaker service.
- DH_ICEBERG_DATA_ROOT: The root directory for Icebreaker data.
The script asserts the presence of these environment variables and then
executes a system command to create the warehouse using the DataHub Iceberg CLI.
Usage:
Ensure the required environment variables are set, then run the script.
Example:
$ export DH_ICEBERG_CLIENT_ID="your_client_id"
$ export DH_ICEBERG_CLIENT_SECRET="your_client_secret"
$ export DH_ICEBERG_AWS_ROLE="your_test_role"
$ export DH_ICEBERG_DATA_ROOT="your_data_root"
$ python provision_warehouse.py
"""

import os

from constants import warehouse

# Assert that env variables are present

assert os.environ.get("DH_ICEBERG_CLIENT_ID"), (
"DH_ICEBERG_CLIENT_ID variable is not present"
)
assert os.environ.get("DH_ICEBERG_CLIENT_SECRET"), (
"DH_ICEBERG_CLIENT_SECRET variable is not present"
)
assert os.environ.get("DH_ICEBERG_AWS_ROLE"), (
"DH_ICEBERG_AWS_ROLE variable is not present"
)
assert os.environ.get("DH_ICEBERG_DATA_ROOT"), (
"DH_ICEBERG_DATA_ROOT variable is not present"
)

assert os.environ.get("DH_ICEBERG_DATA_ROOT", "").startswith("s3://")

os.system(
f"datahub iceberg create --warehouse {warehouse} --data_root $DH_ICEBERG_DATA_ROOT/{warehouse} --client_id $DH_ICEBERG_CLIENT_ID --client_secret $DH_ICEBERG_CLIENT_SECRET --region 'us-east-1' --role $DH_ICEBERG_AWS_ROLE"
)
15 changes: 15 additions & 0 deletions metadata-ingestion/examples/iceberg/read_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog

# Load the catalog
from datahub.ingestion.graph.client import get_default_graph

graph = get_default_graph()

catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)
# Append the data to the Iceberg table
table = catalog.load_table(f"{namespace}.{table_name}")
con = table.scan().to_duckdb(table_name=f"{table_name}")

for row in con.execute(f"SELECT * FROM {table_name}").fetchall():
print(row)
3 changes: 3 additions & 0 deletions metadata-ingestion/examples/iceberg/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# The code in this directory requires the latest pyiceberg
pyiceberg >= 0.8.1
pyarrow >= 19.0.0
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
@@ -448,6 +448,7 @@
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"iceberg": iceberg_common,
"iceberg-catalog": aws_common,
"json-schema": set(),
"kafka": kafka_common | kafka_protobuf,
"kafka-connect": sql_common | {"requests", "JPype1"},
@@ -631,6 +632,7 @@
"elasticsearch",
"feast",
"iceberg",
"iceberg-catalog",
"mlflow",
"json-schema",
"ldap",
@@ -693,6 +695,7 @@
"hana",
"hive",
"iceberg",
"iceberg-catalog",
"kafka-connect",
"ldap",
"mongodb",
707 changes: 707 additions & 0 deletions metadata-ingestion/src/datahub/cli/iceberg_cli.py

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
@@ -183,6 +183,18 @@ def init(use_password: bool = False) -> None:
datahub.add_command(assertions)
datahub.add_command(container)

try:
from datahub.cli.iceberg_cli import iceberg

datahub.add_command(iceberg)
except ImportError as e:
logger.debug(f"Failed to load datahub iceberg command: {e}")
datahub.add_command(
make_shim_command(
"iceberg", "run `pip install 'acryl-datahub[iceberg-catalog]'`"
)
)

try:
from datahub.cli.lite_cli import lite

Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package com.linkedin.metadata.entity;

import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.FORCE_INDEXING_KEY;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.Constants.UI_SOURCE;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.entity.TransactionContext.DEFAULT_MAX_TRANSACTION_RETRY;
import static com.linkedin.metadata.utils.PegasusUtils.constructMCL;
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
@@ -1502,20 +1497,37 @@ public String batchApplyRetention(

private boolean preprocessEvent(
@Nonnull OperationContext opContext, MetadataChangeLog metadataChangeLog) {
if (updateIndicesService == null) {
return false;
}

boolean syncIndexUpdate = false;

if (preProcessHooks.isUiEnabled()) {
if (metadataChangeLog.getSystemMetadata() != null) {
if (metadataChangeLog.getSystemMetadata().getProperties() != null) {
if (UI_SOURCE.equals(
metadataChangeLog.getSystemMetadata().getProperties().get(APP_SOURCE))) {
// Pre-process the update indices hook for UI updates to avoid perceived lag from Kafka
if (updateIndicesService != null) {
updateIndicesService.handleChangeEvent(opContext, metadataChangeLog);
}
return true;
syncIndexUpdate = true;
}
}
}
}
if (!syncIndexUpdate && metadataChangeLog.getHeaders() != null) {
if (metadataChangeLog
.getHeaders()
.getOrDefault(SYNC_INDEX_UPDATE_HEADER_NAME, "false")
.equalsIgnoreCase(Boolean.toString(true))) {
// A specific MCP requested a sync index update.
syncIndexUpdate = true;
}
}

if (syncIndexUpdate) {
updateIndicesService.handleChangeEvent(opContext, metadataChangeLog);
return true;
}
return false;
}

Original file line number Diff line number Diff line change
@@ -2006,6 +2006,97 @@ public void testUIPreProcessedProposal() throws Exception {
"datasetKey"));
}

@Test
public void testSyncHeaderPreProcessedProposal() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)");
EditableDatasetProperties datasetProperties = new EditableDatasetProperties();
datasetProperties.setDescription("Foo Bar");
MetadataChangeProposal gmce = new MetadataChangeProposal();
gmce.setEntityUrn(entityUrn);
gmce.setChangeType(ChangeType.UPSERT);
gmce.setEntityType("dataset");
gmce.setAspectName("editableDatasetProperties");

JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec();
byte[] datasetPropertiesSerialized = dataTemplateCodec.dataTemplateToBytes(datasetProperties);
GenericAspect genericAspect = new GenericAspect();
genericAspect.setValue(ByteString.unsafeWrap(datasetPropertiesSerialized));
genericAspect.setContentType("application/json");
gmce.setAspect(genericAspect);

// verify with sync header
StringMap headers = new StringMap();
headers.put(SYNC_INDEX_UPDATE_HEADER_NAME, "true");
gmce.setHeaders(headers);

ArgumentCaptor<MetadataChangeLog> mceCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
_entityServiceImpl.ingestProposal(opContext, gmce, TEST_AUDIT_STAMP, false);

verify(_mockUpdateIndicesService, times(1))
.handleChangeEvent(eq(opContext), mceCaptor.capture());
assertTrue(
mceCaptor
.getValue()
.getHeaders()
.get(SYNC_INDEX_UPDATE_HEADER_NAME)
.equalsIgnoreCase("true"));
assertEquals(mceCaptor.getValue().getEntityUrn(), entityUrn);
}

@Test
public void testWithoutSyncHeaderOrUISourcePreProcessedProposal() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)");
EditableDatasetProperties datasetProperties = new EditableDatasetProperties();
datasetProperties.setDescription("Foo Bar");
MetadataChangeProposal gmce = new MetadataChangeProposal();
gmce.setEntityUrn(entityUrn);
gmce.setChangeType(ChangeType.UPSERT);
gmce.setEntityType("dataset");
gmce.setAspectName("editableDatasetProperties");

JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec();
byte[] datasetPropertiesSerialized = dataTemplateCodec.dataTemplateToBytes(datasetProperties);
GenericAspect genericAspect = new GenericAspect();
genericAspect.setValue(ByteString.unsafeWrap(datasetPropertiesSerialized));
genericAspect.setContentType("application/json");
gmce.setAspect(genericAspect);

ArgumentCaptor<MetadataChangeLog> mceCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
_entityServiceImpl.ingestProposal(opContext, gmce, TEST_AUDIT_STAMP, false);

verify(_mockUpdateIndicesService, never()).handleChangeEvent(any(), any());
}

@Test
public void testWithNullUpdateIndicesServicePreProcessedProposal() throws Exception {
_entityServiceImpl.setUpdateIndicesService(
null); // this should cause skipping of the sync index update
Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)");
EditableDatasetProperties datasetProperties = new EditableDatasetProperties();
datasetProperties.setDescription("Foo Bar");
MetadataChangeProposal gmce = new MetadataChangeProposal();
gmce.setEntityUrn(entityUrn);
gmce.setChangeType(ChangeType.UPSERT);
gmce.setEntityType("dataset");
gmce.setAspectName("editableDatasetProperties");

StringMap headers = new StringMap();
headers.put(SYNC_INDEX_UPDATE_HEADER_NAME, "true");
gmce.setHeaders(headers);

JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec();
byte[] datasetPropertiesSerialized = dataTemplateCodec.dataTemplateToBytes(datasetProperties);
GenericAspect genericAspect = new GenericAspect();
genericAspect.setValue(ByteString.unsafeWrap(datasetPropertiesSerialized));
genericAspect.setContentType("application/json");
gmce.setAspect(genericAspect);

ArgumentCaptor<MetadataChangeLog> mceCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
_entityServiceImpl.ingestProposal(opContext, gmce, TEST_AUDIT_STAMP, false);

verify(_mockUpdateIndicesService, never()).handleChangeEvent(any(), any());
}

@Test
public void testStructuredPropertyIngestProposal() throws Exception {
String urnStr = "urn:li:dataset:(urn:li:dataPlatform:looker,sample_dataset_unique,PROD)";
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace com.linkedin.dataplatforminstance

import com.linkedin.common.Urn
import com.linkedin.common.FabricType

/**
* An Iceberg warehouse location and credentails whose read/writes are governed by datahub catalog.
*/
@Aspect = {
"name": "icebergWarehouseInfo"
}
record IcebergWarehouseInfo {

/**
* Path of the root for the backing store of the tables in the warehouse.
*/
dataRoot: string

/**
* clientId to be used to authenticate with storage hosting this warehouse
*/
clientId: Urn

/**
* client secret to authenticate with storage hosting this warehouse
*/
clientSecret: Urn

/**
* region where the warehouse is located.
*/
region: string

/*
* Role to be used when vending credentials to writers.
*/
role: optional string

/*
* Expiration for temporary credentials created to access this warehouse.
*/
tempCredentialExpirationSeconds: optional int

/*
* Environment where all assets stored in this warehouse belong to
*/
env: FabricType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace com.linkedin.dataset

import com.linkedin.common.Uri
import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.TimeStamp

/**
* Iceberg Catalog metadata associated with an Iceberg table/view
*/
@Aspect = {
"name": "icebergCatalogInfo"
}
record IcebergCatalogInfo {

/**
* When Datahub is the REST Catalog for an Iceberg Table, stores the current metadata pointer.
* If the Iceberg table is managed by an external catalog, the metadata pointer is not set.
*/
metadataPointer: optional string

view: optional boolean

// tableProperties: map[string, string] = { }
}
2 changes: 2 additions & 0 deletions metadata-models/src/main/resources/entity-registry.yml
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ entities:
- forms
- partitionsSummary
- versionProperties
- icebergCatalogInfo
- name: dataHubPolicy
doc: DataHub Policies represent access policies granted to users or groups on metadata operations like edit, view etc.
category: internal
@@ -337,6 +338,7 @@ entities:
- institutionalMemory
- deprecation
- status
- icebergWarehouseInfo
- name: mlModel
category: core
keyAspect: mlModelKey
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ baseUrl: ${DATAHUB_BASE_URL:http://localhost:9002}
authentication:
# Enable if you want all requests to the Metadata Service to be authenticated.
enabled: ${METADATA_SERVICE_AUTH_ENABLED:true}
excludedPaths: /schema-registry/*,/health,/config,/config/search/export
excludedPaths: /schema-registry/*,/health,/config,/config/search/export,/public-iceberg/*

# Disable if you want to skip validation of deleted user's tokens
enforceExistenceEnabled: ${METADATA_SERVICE_AUTH_ENFORCE_EXISTENCE_ENABLED:true}
@@ -649,3 +649,7 @@ metadataChangeProposal:
initialIntervalMs: ${MCP_TIMESERIES_INITIAL_INTERVAL_MS:100}
multiplier: ${MCP_TIMESERIES_MULTIPLIER:10}
maxIntervalMs: ${MCP_TIMESERIES_MAX_INTERVAL_MS:30000}

icebergCatalog:
enablePublicRead: ${ENABLE_PUBLIC_READ:false}
publiclyReadableTag: ${PUBLICLY_READABLE_TAG:PUBLICLY_READABLE}
84 changes: 84 additions & 0 deletions metadata-service/iceberg-catalog/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
plugins {
id 'java'
}
apply from: '../../gradle/coverage/java-coverage.gradle'

ext {
python_executable = 'python3'
venv_name = 'venv'
}

dependencies {
implementation project(':metadata-service:services')
implementation project(':metadata-models')
implementation project(':metadata-utils')
implementation project(':metadata-operation-context')
implementation project(':metadata-integration:java:datahub-schematron:lib')
implementation 'org.apache.iceberg:iceberg-core:1.6.1'
implementation 'org.apache.iceberg:iceberg-aws:1.6.1'
implementation 'software.amazon.awssdk:sts:2.26.12'
implementation 'software.amazon.awssdk:iam-policy-builder:2.26.12'
implementation externalDependency.awsS3

implementation externalDependency.reflections
implementation externalDependency.springBoot
implementation externalDependency.springCore
implementation(externalDependency.springDocUI) {
exclude group: 'org.springframework.boot'
}
implementation externalDependency.springWeb
implementation externalDependency.springWebMVC
implementation externalDependency.springBeans
implementation externalDependency.springContext
implementation externalDependency.springBootAutoconfigure
implementation externalDependency.servletApi
implementation externalDependency.slf4jApi
compileOnly externalDependency.lombok
implementation externalDependency.antlr4Runtime
implementation externalDependency.antlr4
implementation externalDependency.javaxInject
implementation externalDependency.avro
implementation externalDependency.guava

annotationProcessor externalDependency.lombok

testImplementation externalDependency.springBootTest
testImplementation project(':mock-entity-registry')
testImplementation externalDependency.springBoot
testImplementation externalDependency.testContainers
testImplementation externalDependency.testContainersKafka
testImplementation externalDependency.springKafka
testImplementation externalDependency.testng
testImplementation externalDependency.mockito
testImplementation externalDependency.logbackClassic
testImplementation externalDependency.jacksonCore
testImplementation externalDependency.jacksonDataBind
testImplementation externalDependency.springBootStarterWeb
}

task installDev(type: Exec) {
inputs.file file('pyproject.toml')
inputs.file file('requirements.txt')
outputs.file("${venv_name}/.build_install_dev_sentinel")
commandLine 'bash', '-c',
"set -x && " +
"${python_executable} -m venv ${venv_name} && " +
"${venv_name}/bin/python -m pip install --upgrade uv && " +
"set +x && source ${venv_name}/bin/activate && set -x && " +
"uv pip install -r requirements.txt && " +
"touch ${venv_name}/.build_install_dev_sentinel"
}

task integrationTestQuick(type: Exec, dependsOn: installDev) {
workingDir = project.projectDir
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"pytest -m quick"
}

task integrationTest(type: Exec, dependsOn: installDev) {
workingDir = project.projectDir
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"pytest"
}
46 changes: 46 additions & 0 deletions metadata-service/iceberg-catalog/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[project]
name = "iceberg-catalog-integration-test"
version = "0.0.0"
description = ""
authors = [
{ name="Acryl Data", email="eng@acryl.io" },
]
requires-python = ">=3.9"


[tool.black]
extend-exclude = '''
# A regex preceded with ^/ will apply only to files and directories
# in the root of the project.
tmp
venv
'''
include = '\.pyi?$'
target-version = ['py310']

[tool.isort]
profile = 'black'

[tool.ruff]
ignore = [
'E501', # Ignore line length, since black handles that.
'D203', # Ignore 1 blank line required before class docstring.
]

[tool.mypy]
exclude = "^(venv/|build/|dist/)"
ignore_missing_imports = true
namespace_packages = false
check_untyped_defs = true
disallow_untyped_decorators = true
warn_unused_configs = true
# eventually we'd like to enable these
disallow_incomplete_defs = false
disallow_untyped_defs = false

[tool.pyright]
extraPaths = ['tests']
16 changes: 16 additions & 0 deletions metadata-service/iceberg-catalog/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pytest>=6.2
pytest-dependency>=0.5.1
pyspark==3.5.3
-e ../../metadata-ingestion[iceberg-catalog]
# libaries for linting below this
black==23.7.0
isort==5.12.0
mypy==1.5.1
ruff==0.0.287
# stub version are copied from metadata-ingestion/setup.py and that should be the source of truth
types-requests>=2.28.11.6,<=2.31.0.3
types-PyYAML
# https://github.com/docker/docker-py/issues/3256
requests<=2.31.0
# Missing numpy requirement in 8.0.0
deepdiff!=8.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
from pyspark.sql import SparkSession
import os
from urllib.parse import urlparse
import pytest
from datahub.cli import cli_utils, env_utils, iceberg_cli
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph


def get_gms_url():
return os.getenv("DATAHUB_GMS_URL") or "http://localhost:8080"


@pytest.fixture
def personal_access_token():
username = "datahub"
password = "datahub"
token_name, token = cli_utils.generate_access_token(
username, password, get_gms_url()
)

# Setting this env var makes get_default_graph use these env vars to create a graphql client.
os.environ["DATAHUB_GMS_TOKEN"] = token
os.environ["DATAHUB_GMS_HOST"] = urlparse(get_gms_url()).hostname
os.environ["DATAHUB_GMS_PORT"] = str(urlparse(get_gms_url()).port)

yield token

# revoke token


def give_all_permissions(username, policy_name):
client = get_default_graph()
query = """
mutation createAdminRole($policyName: String!, $user: String!) {
createPolicy(
input: {
name: $policyName,
description: "For Testing",
state: ACTIVE,
type: METADATA,
privileges: ["DATA_READ_WRITE", "DATA_MANAGE_NAMESPACES", "DATA_MANAGE_TABLES", "DATA_MANAGE_VIEWS", "DATA_MANAGE_NAMESPACES", "DATA_LIST_ENTITIES"],
actors: {users: [$user],
allUsers: false,
resourceOwners: true,
allGroups: false}}
)
}
"""
variables = {"user": f"urn:li:corpuser:{username}", "policyName": policy_name}

response = client.execute_graphql(
query, variables=variables, format_exception=False
)


@pytest.fixture
def spark_session(personal_access_token, warehouse):
# Create a Spark session

spark = (
SparkSession.builder.appName("Simple Example")
.config(
"spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1",
)
.config("spark.sql.catalog.test", "org.apache.iceberg.spark.SparkCatalog")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog",
)
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config("spark.sql.catalog.test.type", "rest")
.config("spark.sql.catalog.test.uri", f"{get_gms_url()}/iceberg")
.config("spark.sql.catalog.test.warehouse", warehouse)
.config("spark.sql.catalog.test.token", personal_access_token)
.config("spark.sql.defaultCatalog", "test")
.config("spark.sql.catalog.test.default-namespace", "default")
.config(
"spark.sql.catalog.test.header.X-Iceberg-Access-Delegation",
"vended-credentials",
)
.config("spark.sql.catalog.test.rest-metrics-reporting-enabled", False)
.master("local[*]")
.getOrCreate()
)

# ensure default namespace
spark.sql("create namespace if not exists default")

yield spark

# Stop the Spark session
spark.stop()


@pytest.fixture(params=[f"test_wh_{index}" for index in range(4)])
def warehouse(request, personal_access_token):
warehouse_name = request.param
# PAT dependency just to ensure env vars are setup with token
give_all_permissions("datahub", "test-policy")

data_root = os.getenv(
"ICEBERG_DATA_ROOT", f"s3://srinath-dev/test/{warehouse_name}"
)
client_id = os.getenv("ICEBERG_CLIENT_ID")
client_secret = os.getenv("ICEBERG_CLIENT_SECRET")
region = os.getenv("ICEBERG_REGION")
role = os.getenv("ICEBERG_ROLE")

if not all((data_root, client_id, client_secret, region, role)):
pytest.fail(
"Must set ICEBERG_DATA_ROOT, ICEBERG_CLIENT_ID, ICEBERG_CLIENT_SECRET, ICEBERG_REGION, ICEBERG_ROLE"
)

try:
iceberg_cli.delete.callback(warehouse_name, dry_run=False, force=True)
print(
f"Deleted warehouse {warehouse_name}"
) # This ensures we are starting with a new warehouse.
except Exception as e:
print(e)

iceberg_cli.create.callback(
warehouse=warehouse_name,
description="",
data_root=data_root,
client_id=client_id,
client_secret=client_secret,
region=region,
role=role,
env="PROD",
duration_seconds=60 * 60,
)

yield warehouse_name


def cleanup(session):
# Cleanup any remnants of past test runs
session.sql("drop table if exists test_table")
session.sql("drop view if exists test_view")


def _test_basic_table_ops(spark_session):
spark_session.sql("create table test_table (id int, name string)")

spark_session.sql("insert into test_table values(1, 'foo' ) ")
result = spark_session.sql("SELECT * FROM test_table")
assert result.count() == 1

spark_session.sql("update test_table set name='bar' where id=1")
result = spark_session.sql("SELECT * FROM test_table where name='bar'")
assert result.count() == 1

spark_session.sql("delete from test_table")
result = spark_session.sql("SELECT * FROM test_table")
assert result.count() == 0

spark_session.sql("drop table test_table")
try:
spark_session.sql("select * from test_table")
assert False, "Table must not exist"
except:
pass # Exception is expected

# TODO: Add dataset verification


def _test_basic_view_ops(spark_session):
spark_session.sql("create table test_table (id int, name string)")
spark_session.sql("insert into test_table values(1, 'foo' ) ")

spark_session.sql("create view test_view AS select * from test_table")
result = spark_session.sql("SELECT * FROM test_view")
assert result.count() == 1

spark_session.sql("DROP VIEW test_view")
try:
spark_session.sql("SELECT * FROM test_view")
assert False, "test_view must not exist"
except:
pass # Exception is expected

spark_session.sql("drop table test_table")


def _test_rename_ops(spark_session):
spark_session.sql("create table test_table (id int, name string)")
spark_session.sql("insert into test_table values(1, 'foo' ) ")

spark_session.sql("alter table test_table rename to test_table_renamed")

try:
spark_session.sql("SELECT * FROM test_table")
assert False, "test_table must not exist"
except:
pass # Exception is expected

spark_session.sql("insert into test_table_renamed values(2, 'bar' ) ")
result = spark_session.sql("SELECT * FROM test_table_renamed")
assert result.count() == 2

spark_session.sql("create view test_view as select * from test_table_renamed")
result = spark_session.sql("SELECT * FROM test_view")
assert result.count() == 2

spark_session.sql("alter view test_view rename to test_view_renamed")
result = spark_session.sql("SELECT * FROM test_view_renamed")
assert result.count() == 2

spark_session.sql("drop view test_view_renamed")
spark_session.sql("drop view test_table_renamed")


@pytest.mark.quick
@pytest.mark.parametrize("warehouse", ["test_wh_0"], indirect=True)
def test_iceberg_quick(spark_session, warehouse):
spark_session.sql("use namespace default")
_test_basic_table_ops(spark_session)
_test_basic_view_ops(spark_session)
_test_rename_ops(spark_session)


def _create_table(spark_session, ns, table_name):
spark_session.sql("create namespace if not exists default")
spark_session.sql(f"create namespace if not exists {ns}")
spark_session.sql(f"drop table if exists {ns}.{table_name}")
spark_session.sql(f"create table {ns}.{table_name} (id int, name string)")

spark_session.sql(f"insert into {ns}.{table_name} values (1, 'foo' ) ")


def test_load_tables(spark_session, warehouse):
namespace_count = 3
table_count = 4
for ns_index in range(namespace_count):
ns = f"default_ns{ns_index}"
for table_index in range(table_count):
table_name = f"table_{table_index}"
_create_table(spark_session, ns, table_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package io.datahubproject.iceberg.catalog;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect;
import static io.datahubproject.iceberg.catalog.Utils.*;

import com.google.common.util.concurrent.Striped;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataplatforminstance.IcebergWarehouseInfo;
import com.linkedin.dataset.IcebergCatalogInfo;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.platformresource.PlatformResourceInfo;
import com.linkedin.secret.DataHubSecretValue;
import com.linkedin.util.Pair;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.services.SecretService;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.*;

public class DataHubIcebergWarehouse {

public static final String DATASET_ICEBERG_METADATA_ASPECT_NAME = "icebergCatalogInfo";
public static final String DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME =
"icebergWarehouseInfo";

private final EntityService entityService;

private final SecretService secretService;

private final OperationContext operationContext;

private final IcebergWarehouseInfo icebergWarehouse;

@Getter private final String platformInstance;

// TODO: Need to handle locks for deployments with multiple GMS replicas.
private static final Striped<Lock> resourceLocks =
Striped.lazyWeakLock(Runtime.getRuntime().availableProcessors() * 2);

private DataHubIcebergWarehouse(
String platformInstance,
IcebergWarehouseInfo icebergWarehouse,
EntityService entityService,
SecretService secretService,
OperationContext operationContext) {
this.platformInstance = platformInstance;
this.icebergWarehouse = icebergWarehouse;
this.entityService = entityService;
this.secretService = secretService;
this.operationContext = operationContext;
}

public static DataHubIcebergWarehouse of(
String platformInstance,
EntityService entityService,
SecretService secretService,
OperationContext operationContext) {
Urn platformInstanceUrn = Utils.platformInstanceUrn(platformInstance);
RecordTemplate warehouseAspect =
entityService.getLatestAspect(
operationContext,
platformInstanceUrn,
DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME);

if (warehouseAspect == null) {
throw new NotFoundException("Unknown warehouse " + platformInstance);
}

IcebergWarehouseInfo icebergWarehouse = new IcebergWarehouseInfo(warehouseAspect.data());
return new DataHubIcebergWarehouse(
platformInstance, icebergWarehouse, entityService, secretService, operationContext);
}

public CredentialProvider.StorageProviderCredentials getStorageProviderCredentials() {

Urn clientIdUrn, clientSecretUrn;
String role, region;
Integer expirationSeconds;

clientIdUrn = icebergWarehouse.getClientId();
clientSecretUrn = icebergWarehouse.getClientSecret();
role = icebergWarehouse.getRole();
region = icebergWarehouse.getRegion();
expirationSeconds = icebergWarehouse.getTempCredentialExpirationSeconds();

Map<Urn, List<RecordTemplate>> credsMap =
entityService.getLatestAspects(
operationContext,
Set.of(clientIdUrn, clientSecretUrn),
Set.of("dataHubSecretValue"),
false);

DataHubSecretValue clientIdValue =
new DataHubSecretValue(credsMap.get(clientIdUrn).get(0).data());

String clientId = secretService.decrypt(clientIdValue.getValue());

DataHubSecretValue clientSecretValue =
new DataHubSecretValue(credsMap.get(clientSecretUrn).get(0).data());
String clientSecret = secretService.decrypt(clientSecretValue.getValue());

return new CredentialProvider.StorageProviderCredentials(
clientId, clientSecret, role, region, expirationSeconds);
}

public String getDataRoot() {
return icebergWarehouse.getDataRoot();
}

public Optional<DatasetUrn> getDatasetUrn(TableIdentifier tableIdentifier) {
Urn resourceUrn = resourceUrn(tableIdentifier);
PlatformResourceInfo platformResourceInfo =
(PlatformResourceInfo)
entityService.getLatestAspect(
operationContext, resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME);
if (platformResourceInfo == null) {
return Optional.empty();
}
try {
return Optional.of(DatasetUrn.createFromString(platformResourceInfo.getPrimaryKey()));
} catch (URISyntaxException e) {
throw new RuntimeException("Invalid dataset urn " + platformResourceInfo.getPrimaryKey(), e);
}
}

public IcebergCatalogInfo getIcebergMetadata(TableIdentifier tableIdentifier) {
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
if (datasetUrn.isEmpty()) {
return null;
}

IcebergCatalogInfo icebergMeta =
(IcebergCatalogInfo)
entityService.getLatestAspect(
operationContext, datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);

if (icebergMeta == null) {
throw new IllegalStateException(
String.format(
"IcebergMetadata not found for resource %s, dataset %s",
resourceUrn(tableIdentifier), datasetUrn.get()));
}
return icebergMeta;
}

public Pair<EnvelopedAspect, DatasetUrn> getIcebergMetadataEnveloped(
TableIdentifier tableIdentifier) {
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
if (datasetUrn.isEmpty()) {
return null;
}

try {
EnvelopedAspect existingEnveloped =
entityService.getLatestEnvelopedAspect(
operationContext,
DATASET_ENTITY_NAME,
datasetUrn.get(),
DATASET_ICEBERG_METADATA_ASPECT_NAME);
if (existingEnveloped == null) {
throw new IllegalStateException(
String.format(
"IcebergMetadata not found for resource %s, dataset %s",
resourceUrn(tableIdentifier), datasetUrn.get()));
}
return Pair.of(existingEnveloped, datasetUrn.get());
} catch (Exception e) {
throw new RuntimeException(
"Error fetching IcebergMetadata aspect for dataset " + datasetUrn.get(), e);
}
}

public boolean deleteDataset(TableIdentifier tableIdentifier) {
Urn resourceUrn = resourceUrn(tableIdentifier);

// guard against concurrent modifications that depend on the resource (rename table/view)
Lock lock = resourceLocks.get(resourceUrn);
lock.lock();
try {
if (!entityService.exists(operationContext, resourceUrn)) {
return false;
}
Optional<DatasetUrn> urn = getDatasetUrn(tableIdentifier);
entityService.deleteUrn(operationContext, resourceUrn);
urn.ifPresent(x -> entityService.deleteUrn(operationContext, x));
return true;
} finally {
lock.unlock();
}
}

public DatasetUrn createDataset(
TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
String datasetName = platformInstance + "." + UUID.randomUUID();
DatasetUrn datasetUrn = new DatasetUrn(platformUrn(), datasetName, fabricType());
createResource(datasetUrn, tableIdentifier, view, auditStamp);
return datasetUrn;
}

public DatasetUrn renameDataset(
TableIdentifier fromTableId, TableIdentifier toTableId, boolean view, AuditStamp auditStamp) {

// guard against concurrent modifications to the resource (other renames, deletion)
Lock lock = resourceLocks.get(resourceUrn(fromTableId));
lock.lock();

try {
Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
if (optDatasetUrn.isEmpty()) {
if (view) {
throw new NoSuchViewException(
"No such view %s", fullTableName(platformInstance, fromTableId));
} else {
throw new NoSuchTableException(
"No such table %s", fullTableName(platformInstance, fromTableId));
}
}

DatasetUrn datasetUrn = optDatasetUrn.get();
try {
createResource(datasetUrn, toTableId, view, auditStamp);
} catch (ValidationException e) {
throw new AlreadyExistsException(
"%s already exists: %s",
view ? "View" : "Table", fullTableName(platformInstance, toTableId));
}
entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
return datasetUrn;
} finally {
lock.unlock();
}
}

private void createResource(
DatasetUrn datasetUrn, TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
PlatformResourceInfo resourceInfo =
new PlatformResourceInfo().setPrimaryKey(datasetUrn.toString());
resourceInfo.setResourceType(view ? "icebergView" : "icebergTable");

MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityUrn(resourceUrn(tableIdentifier));
mcp.setEntityType(PLATFORM_RESOURCE_ENTITY_NAME);
mcp.setAspectName(PLATFORM_RESOURCE_INFO_ASPECT_NAME);
mcp.setChangeType(ChangeType.CREATE_ENTITY);
mcp.setAspect(serializeAspect(resourceInfo));

entityService.ingestProposal(operationContext, mcp, auditStamp, false);
}

private FabricType fabricType() {
return icebergWarehouse.getEnv();
}

@SneakyThrows
private Urn resourceUrn(TableIdentifier tableIdentifier) {
return Urn.createFromString(
String.format(
"urn:li:platformResource:%s.%s",
PLATFORM_NAME, CatalogUtil.fullTableName(platformInstance, tableIdentifier)));
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.datahubproject.iceberg.catalog;

import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import lombok.SneakyThrows;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;

public class DataHubTableOps extends BaseMetastoreTableOperations {

private final TableOrViewOpsDelegate<TableMetadata> delegate;

public DataHubTableOps(
DataHubIcebergWarehouse warehouse,
TableIdentifier tableIdentifier,
EntityService entityService,
OperationContext operationContext,
FileIOFactory fileIOFactory) {
this.delegate =
new TableOpsDelegate(
warehouse, tableIdentifier, entityService, operationContext, fileIOFactory);
}

@Override
public TableMetadata refresh() {
return delegate.refresh();
}

@Override
public TableMetadata current() {
return delegate.current();
}

@SneakyThrows
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
delegate.doCommit(
base == null ? null : new MetadataWrapper<>(base),
new MetadataWrapper<>(metadata),
() -> writeNewMetadataIfRequired(base == null, metadata));
}

@Override
protected String tableName() {
return delegate.name();
}

@Override
public FileIO io() {
return delegate.io();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.datahubproject.iceberg.catalog;

import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewMetadata;

@Slf4j
public class DataHubViewOps extends BaseViewOperations {

private final TableOrViewOpsDelegate<ViewMetadata> delegate;

public DataHubViewOps(
DataHubIcebergWarehouse warehouse,
TableIdentifier tableIdentifier,
EntityService entityService,
OperationContext operationContext,
FileIOFactory fileIOFactory) {
this.delegate =
new ViewOpsDelegate(
warehouse, tableIdentifier, entityService, operationContext, fileIOFactory);
}

@Override
public ViewMetadata refresh() {
return delegate.refresh();
}

@Override
public ViewMetadata current() {
return delegate.current();
}

@Override
protected void doRefresh() {
throw new UnsupportedOperationException();
}

@SneakyThrows
@Override
protected void doCommit(ViewMetadata base, ViewMetadata metadata) {
delegate.doCommit(
base == null ? null : new MetadataWrapper<>(base),
new MetadataWrapper<>(metadata),
() -> writeNewMetadataIfRequired(metadata));
}

@Override
protected String viewName() {
return delegate.name();
}

@Override
public FileIO io() {
return delegate.io();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.datahubproject.iceberg.catalog;

import static com.linkedin.metadata.authorization.PoliciesConfig.*;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.linkedin.metadata.authorization.PoliciesConfig;
import java.util.List;

public enum DataOperation {
READ_ONLY(
DATA_READ_ONLY_PRIVILEGE,
DATA_MANAGE_VIEWS_PRIVILEGE,
DATA_READ_WRITE_PRIVILEGE,
DATA_MANAGE_TABLES_PRIVILEGE),

READ_WRITE(DATA_READ_WRITE_PRIVILEGE, DATA_MANAGE_TABLES_PRIVILEGE),
MANAGE_VIEWS(DATA_MANAGE_VIEWS_PRIVILEGE, DATA_MANAGE_TABLES_PRIVILEGE),
MANAGE_TABLES(DATA_MANAGE_TABLES_PRIVILEGE),
MANAGE_NAMESPACES(DATA_MANAGE_NAMESPACES_PRIVILEGE),

LIST(DATA_LIST_ENTITIES_PRIVILEGE);

public final List<PoliciesConfig.Privilege> ascendingPrivileges;
public final List<PoliciesConfig.Privilege> descendingPrivileges;

DataOperation(PoliciesConfig.Privilege... ascendingPrivileges) {
this.ascendingPrivileges = ImmutableList.copyOf(ascendingPrivileges);
this.descendingPrivileges = Lists.reverse(this.ascendingPrivileges);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.datahubproject.iceberg.catalog;

import com.linkedin.metadata.authorization.PoliciesConfig;
import java.util.Set;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.io.FileIO;

interface FileIOFactory {
FileIO createIO(
String platformInstance, PoliciesConfig.Privilege privilege, Set<String> locations);

FileIO createIO(
String platformInstance, PoliciesConfig.Privilege privilege, TableMetadata tableMetadata);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.datahubproject.iceberg.catalog;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.key.DataPlatformInstanceKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Set;
import lombok.SneakyThrows;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;

public class Utils {
public static final String PLATFORM_NAME = "iceberg";

private static final String NAMESPACE_CONTAINER_PREFIX = "urn:li:container:iceberg__";

@SneakyThrows
public static AuditStamp auditStamp() {
return new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
}

public static MetadataChangeProposal platformInstanceMcp(
String platformInstanceName, Urn urn, String entityType) {
DataPlatformInstance platformInstance = new DataPlatformInstance();
platformInstance.setPlatform(platformUrn());
platformInstance.setInstance(platformInstanceUrn(platformInstanceName));

MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityUrn(urn);
mcp.setEntityType(entityType);
mcp.setAspectName(DATA_PLATFORM_INSTANCE_ASPECT_NAME);
mcp.setAspect(serializeAspect(platformInstance));
mcp.setChangeType(ChangeType.UPSERT);

return mcp;
}

public static DataPlatformUrn platformUrn() {
return new DataPlatformUrn(PLATFORM_NAME);
}

public static Urn platformInstanceUrn(String platformInstance) {
DataPlatformInstanceKey platformInstanceKey =
new DataPlatformInstanceKey().setInstance(platformInstance).setPlatform(platformUrn());
return EntityKeyUtils.convertEntityKeyToUrn(
platformInstanceKey, DATA_PLATFORM_INSTANCE_ENTITY_NAME);
}

public static Urn containerUrn(String platformInstance, Namespace ns) {
return containerUrn(platformInstance, ns.levels());
}

@SneakyThrows
public static Urn containerUrn(String platformInstance, String[] levels) {
StringBuilder containerFullName = new StringBuilder(platformInstance);
for (String level : levels) {
containerFullName.append(".").append(level);
}
return Urn.createFromString(NAMESPACE_CONTAINER_PREFIX + containerFullName);
}

public static String fullTableName(String platformInstance, TableIdentifier tableIdentifier) {
return CatalogUtil.fullTableName(platformInstance, tableIdentifier);
}

public static Set<String> locations(TableMetadata tableMetadata) {
Set<String> locations = new HashSet<>();
locations.add(tableMetadata.location());
if (tableMetadata.properties().containsKey(TableProperties.WRITE_DATA_LOCATION)) {
locations.add(tableMetadata.properties().get(TableProperties.WRITE_DATA_LOCATION));
}
if (tableMetadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION)) {
locations.add(tableMetadata.properties().get(TableProperties.WRITE_METADATA_LOCATION));
}
return locations;
}

public static Namespace namespaceFromString(String namespace) {
return RESTUtil.decodeNamespace(URLEncoder.encode(namespace, Charset.defaultCharset()));
}

public static TableIdentifier tableIdFromString(String namespace, String table) {
return TableIdentifier.of(namespaceFromString(namespace), RESTUtil.decodeString(table));
}

public static String parentDir(String fileLocation) {
return fileLocation.substring(0, fileLocation.lastIndexOf("/"));
}

public static String namespaceNameFromContainerUrn(Urn urn) {
return urn.toString().substring(NAMESPACE_CONTAINER_PREFIX.length());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.datahubproject.iceberg.catalog.credentials;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CachingCredentialProvider implements CredentialProvider {
// this should be lesser than the actual token/cred expiration
private static final int EXPIRATION_MINUTES = 5;

private final Cache<CredentialsCacheKey, Map<String, String>> credentialCache;

private final CredentialProvider credentialProvider;

public CachingCredentialProvider(CredentialProvider credentialProvider) {
this.credentialProvider = credentialProvider;
this.credentialCache =
CacheBuilder.newBuilder().expireAfterWrite(EXPIRATION_MINUTES, TimeUnit.MINUTES).build();
}

public Map<String, String> getCredentials(
CredentialsCacheKey key, StorageProviderCredentials storageProviderCredentials) {
try {
return credentialCache.get(
key, () -> credentialProvider.getCredentials(key, storageProviderCredentials));
} catch (ExecutionException e) {
throw new RuntimeException("Error during cache lookup for credentials", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.datahubproject.iceberg.catalog.credentials;

import com.linkedin.metadata.authorization.PoliciesConfig;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;

public interface CredentialProvider {

@EqualsAndHashCode
@AllArgsConstructor
class CredentialsCacheKey {
public final String platformInstance;
public final PoliciesConfig.Privilege privilege;
public final Set<String> locations;
}

@AllArgsConstructor
class StorageProviderCredentials {
public final String clientId;
public final String clientSecret;
public final String role;
public final String region;
public final Integer tempCredentialExpirationSeconds;
}

Map<String, String> getCredentials(
CredentialsCacheKey key, StorageProviderCredentials storageProviderCredentials);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package io.datahubproject.iceberg.catalog.credentials;

import static com.linkedin.metadata.authorization.PoliciesConfig.*;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import lombok.EqualsAndHashCode;
import org.apache.iceberg.exceptions.BadRequestException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
import software.amazon.awssdk.policybuilder.iam.IamEffect;
import software.amazon.awssdk.policybuilder.iam.IamPolicy;
import software.amazon.awssdk.policybuilder.iam.IamStatement;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;

public class S3CredentialProvider implements CredentialProvider {
private static final int DEFAULT_CREDS_DURATION_SECS = 60 * 60;

public Map<String, String> getCredentials(
CredentialsCacheKey key, StorageProviderCredentials storageProviderCredentials) {

int expiration =
storageProviderCredentials.tempCredentialExpirationSeconds == null
? DEFAULT_CREDS_DURATION_SECS
: storageProviderCredentials.tempCredentialExpirationSeconds;
try (StsClient stsClient = stsClient(storageProviderCredentials)) {
String sessionPolicy = policyString(key);
AssumeRoleResponse response =
stsClient.assumeRole(
AssumeRoleRequest.builder()
.roleArn(storageProviderCredentials.role)
.roleSessionName("DataHubIcebergSession")
.durationSeconds(expiration)
.policy(sessionPolicy)
.build());

return Map.of(
"client.region",
storageProviderCredentials.region,
"s3.access-key-id",
response.credentials().accessKeyId(),
"s3.secret-access-key",
response.credentials().secretAccessKey(),
"s3.session-token",
response.credentials().sessionToken());
}
}

private StsClient stsClient(StorageProviderCredentials storageProviderCredentials) {
AwsBasicCredentials credentials =
AwsBasicCredentials.create(
storageProviderCredentials.clientId, storageProviderCredentials.clientSecret);
return StsClient.builder()
.region(Region.of(storageProviderCredentials.region))
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.region(Region.of(storageProviderCredentials.region))
.build();
}

private String policyString(CredentialsCacheKey key) {
if (key.locations == null || key.locations.isEmpty()) {
throw new BadRequestException("Unspecified locations for credential vending.");
}
if (!Set.of(DATA_READ_WRITE_PRIVILEGE, DATA_READ_ONLY_PRIVILEGE).contains(key.privilege)) {
throw new IllegalStateException("Unsupported credential vending privilege " + key.privilege);
}

Map<String, IamStatement.Builder> bucketListPolicy = new HashMap<>();
IamStatement.Builder objectsPolicy =
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("s3:GetObject")
.addAction("s3:GetObjectVersion");

if (DATA_READ_WRITE_PRIVILEGE.equals(key.privilege)) {
objectsPolicy.addAction("s3:PutObject").addAction("s3:DeleteObject");
}

key.locations.forEach(
location -> {
S3Location s3Location = new S3Location(location);
objectsPolicy.addResource(s3Location.objectsArn());
bucketListPolicy
.computeIfAbsent(
s3Location.bucketArn(),
bucketArn ->
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("s3:ListBucket")
.addResource(bucketArn))
.addCondition(
IamConditionOperator.STRING_LIKE, "s3:prefix", s3Location.objectsPathPrefix());
});

IamPolicy.Builder sessionPolicyBuilder = IamPolicy.builder();
sessionPolicyBuilder.addStatement(objectsPolicy.build());

for (Map.Entry<String, IamStatement.Builder> bucketListStatement :
bucketListPolicy.entrySet()) {
sessionPolicyBuilder.addStatement(bucketListStatement.getValue().build());

String bucketArn = bucketListStatement.getKey();
sessionPolicyBuilder.addStatement(
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("s3:GetBucketLocation")
.addResource(bucketArn)
.build());
}
return sessionPolicyBuilder.build().toJson();
}

@EqualsAndHashCode
private static class S3Location {
private final String bucket;
private final String path;
private final String s3ArnPrefix;

S3Location(String location) {
URI uri = URI.create(location);
this.bucket = uri.getAuthority();
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
this.path = path;
this.s3ArnPrefix = "arn:aws:s3:::";
}

String objectsArn() {
return bucketArn() + "/" + objectsPathPrefix();
}

String bucketArn() {
return s3ArnPrefix + bucket;
}

String objectsPathPrefix() {
return path + "/*";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.datahubproject.iceberg.catalog.rest.common;

import io.datahubproject.iceberg.catalog.rest.open.PublicIcebergApiController;
import io.datahubproject.iceberg.catalog.rest.secure.AbstractIcebergController;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.exceptions.*;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;

@ControllerAdvice(
basePackageClasses = {AbstractIcebergController.class, PublicIcebergApiController.class})
@Slf4j
public class IcebergExceptionHandlerAdvice extends ResponseEntityExceptionHandler {

@ExceptionHandler(AlreadyExistsException.class)
public ResponseEntity<?> handle(AlreadyExistsException e) {
return err(e, HttpStatus.CONFLICT);
}

@ExceptionHandler(NoSuchNamespaceException.class)
public ResponseEntity<?> handle(NoSuchNamespaceException e) {
return err(e, HttpStatus.NOT_FOUND);
}

@ExceptionHandler(NamespaceNotEmptyException.class)
public ResponseEntity<?> handle(NamespaceNotEmptyException e) {
return err(e, HttpStatus.BAD_REQUEST);
}

@ExceptionHandler(NoSuchTableException.class)
public ResponseEntity<?> handle(NoSuchTableException e) {
return err(e, HttpStatus.NOT_FOUND);
}

@ExceptionHandler(NoSuchViewException.class)
public ResponseEntity<?> handle(NoSuchViewException e) {
return err(e, HttpStatus.NOT_FOUND);
}

@ExceptionHandler(NotFoundException.class)
public ResponseEntity<?> handle(NotFoundException e) {
return err(e, HttpStatus.NOT_FOUND);
}

@ExceptionHandler(ForbiddenException.class)
public ResponseEntity<?> handle(ForbiddenException e) {
return err(e, HttpStatus.FORBIDDEN);
}

@ExceptionHandler(BadRequestException.class)
public ResponseEntity<?> handle(BadRequestException e) {
return err(e, HttpStatus.BAD_REQUEST);
}

@ExceptionHandler(Exception.class)
public ResponseEntity<?> handle(Exception e) throws Exception {
log.error("Server exception", e);
throw e;
}

private ResponseEntity<?> err(Exception e, HttpStatus errCode) {
ErrorResponse err =
ErrorResponse.builder()
.responseCode(errCode.value())
.withMessage(e.getMessage())
.withType(e.getClass().getSimpleName())
.build();
return new ResponseEntity<>(err, errCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.datahubproject.iceberg.catalog.rest.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import javax.annotation.Nonnull;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;

public class IcebergJsonConverter extends MappingJackson2HttpMessageConverter {
private static final String ICEBERG_PACKAGE_PREFIX = "org.apache.iceberg.";

public IcebergJsonConverter(ObjectMapper objectMapper) {
super(objectMapper);
}

@Override
protected boolean supports(@Nonnull Class<?> clazz) {
return isClassInPackage(clazz);
}

@Override
public boolean canRead(@Nonnull Type type, Class<?> contextClass, MediaType mediaType) {
return hasTypeInPackage(type) && super.canRead(type, contextClass, mediaType);
}

@Override
public boolean canWrite(@Nonnull Class<?> clazz, MediaType mediaType) {
return isClassInPackage(clazz) && super.canWrite(clazz, mediaType);
}

private boolean hasTypeInPackage(Type type) {
if (type instanceof Class<?>) {
return isClassInPackage((Class<?>) type);
}

if (type instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) type;

// Check raw type
Type rawType = paramType.getRawType();
if (rawType instanceof Class<?> && isClassInPackage((Class<?>) rawType)) {
return true;
}

// Recursively check type arguments
for (Type typeArg : paramType.getActualTypeArguments()) {
if (hasTypeInPackage(typeArg)) {
return true;
}
}
}

if (type instanceof WildcardType) {
WildcardType wildcardType = (WildcardType) type;
// Check upper bounds
for (Type bound : wildcardType.getUpperBounds()) {
if (hasTypeInPackage(bound)) {
return true;
}
}
// Check lower bounds
for (Type bound : wildcardType.getLowerBounds()) {
if (hasTypeInPackage(bound)) {
return true;
}
}
}

if (type instanceof GenericArrayType) {
GenericArrayType arrayType = (GenericArrayType) type;
return hasTypeInPackage(arrayType.getGenericComponentType());
}

return false;
}

private static boolean isClassInPackage(@Nonnull Class<?> clazz) {
return clazz.getName().startsWith(ICEBERG_PACKAGE_PREFIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.datahubproject.iceberg.catalog.rest.common;

import io.datahubproject.iceberg.catalog.credentials.CachingCredentialProvider;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.iceberg.catalog.credentials.S3CredentialProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.*;

@Configuration
public class IcebergSpringWebConfig {

@Bean
public CredentialProvider credentialProvider() {
return new S3CredentialProvider();
}

@Bean
public CredentialProvider cachingCredentialProvider(CredentialProvider credentialProvider) {
return new CachingCredentialProvider(credentialProvider);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.datahubproject.iceberg.catalog.rest.open;

import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static io.datahubproject.iceberg.catalog.Utils.*;

import com.google.common.base.Strings;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.TagAssociation;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.TagUrn;
import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.rest.secure.AbstractIcebergController;
import java.net.URISyntaxException;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
@RequestMapping("/public-iceberg")
public class PublicIcebergApiController extends AbstractIcebergController {

@Value("${icebergCatalog.enablePublicRead}")
private boolean isPublicReadEnabled;

@Value("${icebergCatalog.publiclyReadableTag}")
private String publiclyReadableTag;

private static final String ACCESS_TYPE_KEY = "access-type";
private static final String ACCESS_TYPE_PUBLIC_READ = "PUBLIC_READ";

@GetMapping(value = "/v1/config", produces = MediaType.APPLICATION_JSON_VALUE)
public ConfigResponse getConfig(
@RequestParam(value = "warehouse", required = true) String warehouse) {
log.info("GET CONFIG for warehouse {}", warehouse);

checkPublicEnabled();

// check that warehouse exists
warehouse(warehouse, systemOperationContext);
ConfigResponse response = ConfigResponse.builder().withOverride("prefix", warehouse).build();
log.info("GET CONFIG response: {}", response);
return response;
}

@GetMapping(
value = "/v1/{prefix}/namespaces/{namespace}/tables/{table}",
produces = MediaType.APPLICATION_JSON_VALUE)
public LoadTableResponse loadTable(
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@PathVariable("table") String table,
@RequestHeader(value = "X-Iceberg-Access-Delegation", required = false)
String xIcebergAccessDelegation,
@RequestParam(value = "snapshots", required = false) String snapshots) {
log.info("GET TABLE REQUEST {}.{}.{}", platformInstance, namespace, table);

checkPublicEnabled();

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, systemOperationContext);
Optional<DatasetUrn> datasetUrn = warehouse.getDatasetUrn(tableIdFromString(namespace, table));
if (datasetUrn.isPresent()) {
GlobalTags tags =
(GlobalTags)
entityService.getLatestAspect(
systemOperationContext, datasetUrn.get(), GLOBAL_TAGS_ASPECT_NAME);
if (tags != null && tags.hasTags()) {
for (TagAssociation tag : tags.getTags()) {
if (publicTag().equals(tag.getTag())) {
LoadTableResponse getTableResponse =
catalogOperation(
platformInstance,
catalog ->
CatalogHandlers.loadTable(catalog, tableIdFromString(namespace, table)));

log.info("GET TABLE RESPONSE {}", getTableResponse);
return getTableResponse;
}
}
}
}

throw new NoSuchTableException(
"No such table %s", fullTableName(platformInstance, tableIdFromString(namespace, table)));
}

void checkPublicEnabled() {
if (!isPublicReadEnabled || Strings.isNullOrEmpty(publiclyReadableTag)) {
throw new NotFoundException("No endpoint GET /v1/config");
}
}

TagUrn publicTag() {
try {
return TagUrn.createFromString("urn:li:tag:" + publiclyReadableTag);
} catch (URISyntaxException e) {
throw new RuntimeException("Invalid public tag " + publiclyReadableTag, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package io.datahubproject.iceberg.catalog.rest.secure;

import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME;
import static io.datahubproject.iceberg.catalog.Utils.*;

import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.AuthUtil;
import com.datahub.authorization.EntitySpec;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.EntitySearchService;
import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.DataHubRestCatalog;
import io.datahubproject.iceberg.catalog.DataOperation;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.datahubproject.metadata.services.SecretService;
import jakarta.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
public class AbstractIcebergController {
@Autowired protected EntityService entityService;
@Autowired private EntitySearchService searchService;
@Autowired private SecretService secretService;

@Inject
@Named("cachingCredentialProvider")
private CredentialProvider cachingCredentialProvider;

@Inject
@Named("authorizerChain")
private Authorizer authorizer;

@Inject
@Named("systemOperationContext")
protected OperationContext systemOperationContext;

protected PoliciesConfig.Privilege authorize(
OperationContext operationContext,
DataHubIcebergWarehouse warehouse,
TableIdentifier tableIdentifier,
DataOperation operation,
boolean returnHighestPrivilege) {
Optional<DatasetUrn> urn = warehouse.getDatasetUrn(tableIdentifier);
if (urn.isEmpty()) {
throw noSuchEntityException(warehouse.getPlatformInstance(), tableIdentifier);
}

EntitySpec entitySpec = new EntitySpec(DATASET_ENTITY_NAME, urn.get().toString());
try {
return authorize(
operationContext,
entitySpec,
platformInstanceEntitySpec(warehouse.getPlatformInstance()),
operation,
returnHighestPrivilege);
} catch (ForbiddenException e) {
// specify table id in error message instead of dataset-urn
throw new ForbiddenException(
"Data operation %s not authorized on %s",
operation, fullTableName(warehouse.getPlatformInstance(), tableIdentifier));
}
}

protected PoliciesConfig.Privilege authorize(
OperationContext operationContext,
String platformInstance,
DataOperation operation,
boolean returnHighestPrivilege) {
EntitySpec entitySpec = platformInstanceEntitySpec(platformInstance);
return authorize(operationContext, entitySpec, entitySpec, operation, returnHighestPrivilege);
}

private EntitySpec platformInstanceEntitySpec(String platformInstance) {
Urn urn = platformInstanceUrn(platformInstance);
return new EntitySpec(DATA_PLATFORM_INSTANCE_ENTITY_NAME, urn.toString());
}

private PoliciesConfig.Privilege authorize(
OperationContext operationContext,
EntitySpec entitySpec,
EntitySpec platformInstanceEntitySpec,
DataOperation operation,
boolean returnHighestPrivilege) {
List<PoliciesConfig.Privilege> privileges =
returnHighestPrivilege ? operation.descendingPrivileges : operation.ascendingPrivileges;

for (PoliciesConfig.Privilege privilege : privileges) {
if ((entitySpec.getType().equals(DATASET_ENTITY_NAME)
&& PoliciesConfig.DATASET_PRIVILEGES.getPrivileges().contains(privilege)
|| (entitySpec.getType().equals(DATA_PLATFORM_INSTANCE_ENTITY_NAME)
&& PoliciesConfig.PLATFORM_INSTANCE_PRIVILEGES
.getPrivileges()
.contains(privilege)))) {
if (AuthUtil.isAuthorized(operationContext, privilege, entitySpec)) {
return privilege;
}
} else if (entitySpec.getType().equals(DATASET_ENTITY_NAME)
&& PoliciesConfig.PLATFORM_INSTANCE_PRIVILEGES.getPrivileges().contains(privilege)) {
if (AuthUtil.isAuthorized(operationContext, privilege, platformInstanceEntitySpec)) {
return privilege;
}
}
}

throw new ForbiddenException("Data operation %s not authorized on %s", operation, entitySpec);
}

@Data
@AllArgsConstructor
protected static class CatalogOperationResult<R> {
private R response;
private PoliciesConfig.Privilege privilege;
private CredentialProvider.StorageProviderCredentials storageProviderCredentials;
}

protected <R> R catalogOperation(
String platformInstance, Function<DataHubRestCatalog, R> function) {
DataHubIcebergWarehouse warehouse =
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, systemOperationContext);
return catalogOperation(warehouse, systemOperationContext, function);
}

protected <R> R catalogOperation(
DataHubIcebergWarehouse warehouse,
OperationContext operationContext,
Function<DataHubRestCatalog, R> function) {
DataHubRestCatalog catalog = catalog(operationContext, warehouse);
try {
return function.apply(catalog);
} finally {
try {
catalog.close();
} catch (IOException e) {
log.error("Error while closing catalog", e);
}
}
}

protected OperationContext opContext(HttpServletRequest request) {
Authentication auth = AuthenticationContext.getAuthentication();
return OperationContext.asSession(
systemOperationContext,
RequestContext.builder()
.buildOpenapi(auth.getActor().toUrnStr(), request, "icebergDataAction", "dataset"),
authorizer,
auth,
true);
}

protected DataHubRestCatalog catalog(
OperationContext operationContext, DataHubIcebergWarehouse warehouse) {
DataHubRestCatalog catalog =
new DataHubRestCatalog(
entityService, searchService, operationContext, warehouse, cachingCredentialProvider);
return catalog;
}

protected DataHubIcebergWarehouse warehouse(
String platformInstance, OperationContext operationContext) {
return DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, operationContext);
}

protected RuntimeException noSuchEntityException(
String platformInstance, TableIdentifier tableIdentifier) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.datahubproject.iceberg.catalog.rest.secure;

import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
@RequestMapping("/iceberg")
public class IcebergApiController extends AbstractIcebergController {

@PostMapping(
value = "/v1/{prefix}/transactions/commit",
consumes = MediaType.APPLICATION_JSON_VALUE)
public void commit(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@RequestBody CommitTransactionRequest commitTransactionRequest) {
log.info("COMMIT REQUEST {} ", commitTransactionRequest);
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.datahubproject.iceberg.catalog.rest.secure;

import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
@RequestMapping("/iceberg")
public class IcebergConfigApiController extends AbstractIcebergController {
@GetMapping(value = "/v1/config", produces = MediaType.APPLICATION_JSON_VALUE)
public ConfigResponse getConfig(
HttpServletRequest request,
@RequestParam(value = "warehouse", required = true) String warehouse) {
log.info("GET CONFIG for warehouse {}", warehouse);

// check that warehouse exists
warehouse(warehouse, opContext(request));
ConfigResponse response = ConfigResponse.builder().withOverride("prefix", warehouse).build();
log.info("GET CONFIG response: {}", response);
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package io.datahubproject.iceberg.catalog.rest.secure;

import static io.datahubproject.iceberg.catalog.Utils.*;

import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.DataOperation;
import io.datahubproject.metadata.context.OperationContext;
import jakarta.servlet.http.HttpServletRequest;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
@RequestMapping("/iceberg")
public class IcebergNamespaceApiController extends AbstractIcebergController {

@GetMapping(
value = "/v1/{prefix}/namespaces/{namespace}",
produces = MediaType.APPLICATION_JSON_VALUE)
public GetNamespaceResponse getNamespace(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace) {
log.info("GET NAMESPACE REQUEST {}.{}", platformInstance, namespace);

OperationContext operationContext = opContext(request);
// not authorizing get/use namespace operation currently
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

GetNamespaceResponse getNamespaceResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> CatalogHandlers.loadNamespace(catalog, namespaceFromString(namespace)));

log.info("GET NAMESPACE RESPONSE {}", getNamespaceResponse);
return getNamespaceResponse;
}

@PostMapping(
value = "/v1/{prefix}/namespaces",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
public CreateNamespaceResponse createNamespace(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@RequestBody @Nonnull CreateNamespaceRequest createNamespaceRequest) {
log.info(
"CREATE NAMESPACE REQUEST in platformInstance {}, body {}",
platformInstance,
createNamespaceRequest);

OperationContext operationContext = opContext(request);

authorize(operationContext, platformInstance, DataOperation.MANAGE_NAMESPACES, false);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);
CreateNamespaceResponse createNamespaceResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> {
CatalogHandlers.createNamespace(catalog, createNamespaceRequest);
return CreateNamespaceResponse.builder()
.withNamespace(createNamespaceRequest.namespace())
.build();
});

log.info("CREATE NAMESPACE RESPONSE {}", createNamespaceResponse);
return createNamespaceResponse;
}

@PostMapping(
value = "/v1/{prefix}/namespaces/{namespace}",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
public UpdateNamespacePropertiesResponse updateNamespace(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@RequestBody @Nonnull UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {
log.info(
"UPDATE NAMESPACE REQUEST {}.{}, body {}",
platformInstance,
namespace,
updateNamespacePropertiesRequest);

OperationContext operationContext = opContext(request);

authorize(operationContext, platformInstance, DataOperation.MANAGE_NAMESPACES, false);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);
UpdateNamespacePropertiesResponse updateNamespaceResponse =
catalogOperation(
warehouse,
operationContext,
catalog ->
catalog.updateNamespaceProperties(
namespaceFromString(namespace), updateNamespacePropertiesRequest));

log.info("UPDATE NAMESPACE RESPONSE {}", updateNamespaceResponse);
return updateNamespaceResponse;
}

@DeleteMapping(
value = "/v1/{prefix}/namespaces/{namespace}",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
public void dropNamespace(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace) {
log.info("DROP NAMESPACE REQUEST {}.{}", platformInstance, namespace);

OperationContext operationContext = opContext(request);

authorize(operationContext, platformInstance, DataOperation.MANAGE_NAMESPACES, false);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);
catalogOperation(
warehouse,
operationContext,
catalog -> {
CatalogHandlers.dropNamespace(catalog, namespaceFromString(namespace));
return null;
});

log.info("DROPPED NAMESPACE {}", namespace);
}

@GetMapping(value = "/v1/{prefix}/namespaces", produces = MediaType.APPLICATION_JSON_VALUE)
public ListNamespacesResponse listNamespaces(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@RequestParam(value = "parent", required = false) String parent,
@RequestParam(value = "pageToken", required = false) String pageToken,
@RequestParam(value = "pageSize", required = false) Integer pageSize) {
log.info("LIST NAMESPACES REQUEST for {}.{}", platformInstance, parent);

OperationContext operationContext = opContext(request);
authorize(operationContext, platformInstance, DataOperation.LIST, false);

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

ListNamespacesResponse listNamespacesResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> {
Namespace ns;
if (StringUtils.isEmpty(parent)) {
ns = Namespace.empty();
} else {
ns = namespaceFromString(parent);
// ensure namespace exists
catalog.loadNamespaceMetadata(ns);
}
return CatalogHandlers.listNamespaces(catalog, ns);
});
log.info("LIST NAMESPACES RESPONSE {}", listNamespacesResponse);
return listNamespacesResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
package io.datahubproject.iceberg.catalog.rest.secure;

import static io.datahubproject.iceberg.catalog.Utils.*;

import com.linkedin.metadata.authorization.PoliciesConfig;
import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.DataOperation;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.metadata.context.OperationContext;
import jakarta.servlet.http.HttpServletRequest;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
@RequestMapping("/iceberg")
public class IcebergTableApiController extends AbstractIcebergController {

@Autowired private CredentialProvider credentialProvider;

@PostMapping(
value = "/v1/{prefix}/namespaces/{namespace}/tables",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public LoadTableResponse createTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@RequestBody CreateTableRequest createTableRequest,
@RequestHeader(value = "X-Iceberg-Access-Delegation") String xIcebergAccessDelegation) {
log.info(
"CREATE TABLE REQUEST in {}.{}, body {}", platformInstance, namespace, createTableRequest);

OperationContext operationContext = opContext(request);
PoliciesConfig.Privilege privilege =
authorize(operationContext, platformInstance, DataOperation.MANAGE_TABLES, false);

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);
LoadTableResponse createTableResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> {
// ensure namespace exists
Namespace ns = namespaceFromString(namespace);
catalog.loadNamespaceMetadata(ns);
if (createTableRequest.stageCreate()) {
return CatalogHandlers.stageTableCreate(catalog, ns, createTableRequest);
} else {
return CatalogHandlers.createTable(catalog, ns, createTableRequest);
}
});
log.info("CREATE TABLE RESPONSE, excluding creds, {}", createTableResponse);
return includeCreds(
platformInstance,
xIcebergAccessDelegation,
createTableResponse,
PoliciesConfig.DATA_READ_WRITE_PRIVILEGE,
warehouse.getStorageProviderCredentials());
}

private LoadTableResponse includeCreds(
String platformInstance,
String xIcebergAccessDelegation,
LoadTableResponse loadTableResponse,
PoliciesConfig.Privilege privilege,
CredentialProvider.StorageProviderCredentials storageProviderCredentials) {
if ("vended-credentials".equals(xIcebergAccessDelegation)) {
CredentialProvider.CredentialsCacheKey cacheKey =
new CredentialProvider.CredentialsCacheKey(
platformInstance, privilege, locations(loadTableResponse.tableMetadata()));
Map<String, String> creds =
credentialProvider.getCredentials(cacheKey, storageProviderCredentials);
/* log.info(
"STS creds {} for primary table location {}",
creds,
loadTableResponse.tableMetadata().location()); */

return LoadTableResponse.builder()
.withTableMetadata(loadTableResponse.tableMetadata())
.addAllConfig(creds)
.build();
} else {
return loadTableResponse;
}
}

@GetMapping(
value = "/v1/{prefix}/namespaces/{namespace}/tables/{table}",
produces = MediaType.APPLICATION_JSON_VALUE)
public LoadTableResponse loadTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@PathVariable("table") String table,
@RequestHeader(value = "X-Iceberg-Access-Delegation", required = false)
String xIcebergAccessDelegation,
@RequestParam(value = "snapshots", required = false) String snapshots) {
log.info(
"GET TABLE REQUEST {}.{}.{}, access-delegation {}",
platformInstance,
namespace,
table,
xIcebergAccessDelegation);

OperationContext operationContext = opContext(request);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

PoliciesConfig.Privilege privilege =
authorize(
operationContext,
warehouse,
tableIdFromString(namespace, table),
DataOperation.READ_ONLY,
true);

LoadTableResponse getTableResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> CatalogHandlers.loadTable(catalog, tableIdFromString(namespace, table)));
log.info("GET TABLE RESPONSE, excluding creds, {}", getTableResponse);

if (privilege == PoliciesConfig.DATA_MANAGE_TABLES_PRIVILEGE) {
privilege = PoliciesConfig.DATA_READ_WRITE_PRIVILEGE;
} else if (privilege == PoliciesConfig.DATA_MANAGE_VIEWS_PRIVILEGE) {
privilege = PoliciesConfig.DATA_READ_ONLY_PRIVILEGE;
}
return includeCreds(
platformInstance,
xIcebergAccessDelegation,
getTableResponse,
privilege,
warehouse.getStorageProviderCredentials());
}

@PostMapping(
value = "/v1/{prefix}/namespaces/{namespace}/tables/{table}",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public LoadTableResponse updateTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@PathVariable("table") String table,
@RequestBody UpdateTableRequest updateTableRequest) {

log.info(
"UPDATE TABLE REQUEST {}.{}.{}, body {} ",
platformInstance,
namespace,
table,
updateTableRequest);

OperationContext operationContext = opContext(request);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);
authorize(
operationContext,
warehouse,
tableIdFromString(namespace, table),
DataOperation.READ_WRITE,
false);
LoadTableResponse updateTableResponse =
catalogOperation(
warehouse,
operationContext,
catalog ->
CatalogHandlers.updateTable(
catalog, tableIdFromString(namespace, table), updateTableRequest));

// not refreshing credentials here.
log.info("UPDATE TABLE RESPONSE {}", updateTableResponse);

return updateTableResponse;
}

@DeleteMapping(value = "/v1/{prefix}/namespaces/{namespace}/tables/{table}")
public void dropTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@PathVariable("table") String table,
@RequestParam(value = "purgeRequested", defaultValue = "false") Boolean purgeRequested) {

log.info(
"DROP TABLE REQUEST {}.{}.{}, purge = {}",
platformInstance,
namespace,
table,
purgeRequested);

OperationContext operationContext = opContext(request);
authorize(operationContext, platformInstance, DataOperation.MANAGE_TABLES, false);

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

catalogOperation(
warehouse,
operationContext,
catalog -> {
TableIdentifier tableIdentifier = tableIdFromString(namespace, table);
if (purgeRequested) {
CatalogHandlers.purgeTable(catalog, tableIdentifier);
log.info("PURGED TABLE {}", tableIdentifier);
} else {
CatalogHandlers.dropTable(catalog, tableIdentifier);
log.info("DROPPED TABLE {}", tableIdentifier);
}
return null;
});
}

@PostMapping(
value = "/v1/{prefix}/tables/rename",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public void renameTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@RequestBody RenameTableRequest renameTableRequest) {
log.info(
"RENAME TABLE REQUEST in platformInstance {}, body {}",
platformInstance,
renameTableRequest);

OperationContext operationContext = opContext(request);
authorize(operationContext, platformInstance, DataOperation.MANAGE_TABLES, false);

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

catalogOperation(
warehouse,
operationContext,
catalog -> {
CatalogHandlers.renameTable(catalog, renameTableRequest);
return null;
});

log.info(
"RENAMED TABLE {} to {} ", renameTableRequest.source(), renameTableRequest.destination());
}

@PostMapping(
value = "/v1/{prefix}/namespaces/{namespace}/register",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public LoadTableResponse registerTable(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@RequestBody RegisterTableRequest registerTableRequest) {
log.info(
"REGISTER TABLE REQUEST {}.{}, body {}", platformInstance, namespace, registerTableRequest);

OperationContext operationContext = opContext(request);
authorize(operationContext, platformInstance, DataOperation.MANAGE_TABLES, false);

DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

LoadTableResponse registerTableResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> {
// ensure namespace exists
Namespace ns = namespaceFromString(namespace);
catalog.loadNamespaceMetadata(ns);
return CatalogHandlers.registerTable(catalog, ns, registerTableRequest);
});

log.info("REGISTER TABLE RESPONSE {}", registerTableResponse);
return registerTableResponse;
}

@GetMapping(
value = "/v1/{prefix}/namespaces/{namespace}/tables",
produces = MediaType.APPLICATION_JSON_VALUE)
public ListTablesResponse listTables(
HttpServletRequest request,
@PathVariable("prefix") String platformInstance,
@PathVariable("namespace") String namespace,
@RequestParam(value = "pageToken", required = false) String pageToken,
@RequestParam(value = "pageSize", required = false) Integer pageSize) {
log.info("LIST TABLES REQUEST for {}.{}", platformInstance, namespace);

OperationContext operationContext = opContext(request);
authorize(operationContext, platformInstance, DataOperation.LIST, false);
DataHubIcebergWarehouse warehouse = warehouse(platformInstance, operationContext);

ListTablesResponse listTablesResponse =
catalogOperation(
warehouse,
operationContext,
catalog -> {
// ensure namespace exists
Namespace ns = namespaceFromString(namespace);
catalog.loadNamespaceMetadata(ns);
return CatalogHandlers.listTables(catalog, ns);
});
log.info("LIST TABLES RESPONSE {}", listTablesResponse);
return listTablesResponse;
}

@Override
protected NoSuchTableException noSuchEntityException(
String platformInstance, TableIdentifier tableIdentifier) {
return new NoSuchTableException(
"No such table %s", fullTableName(platformInstance, tableIdentifier));
}
}
Loading

0 comments on commit f527c5e

Please sign in to comment.