Skip to content

Commit

Permalink
Catalog/S3: Adopt S3 signing to new object-storage layout (Iceberg 1.…
Browse files Browse the repository at this point in the history
…7.0) (#9896)
  • Loading branch information
snazy authored Nov 11, 2024
1 parent 718d489 commit 912bdea
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.projectnessie.versioned.RequestMeta.apiRead;
import static org.projectnessie.versioned.RequestMeta.apiWrite;

import com.google.common.annotations.VisibleForTesting;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.ws.rs.core.Response.Status;
Expand All @@ -36,6 +37,8 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.immutables.value.Value;
import org.immutables.value.Value.Check;
Expand Down Expand Up @@ -63,6 +66,16 @@
@Value.Immutable
abstract class IcebergS3SignParams {

/**
* Pattern for the new object-storage path layout introduced in Apache Iceberg 1.7.0, checking for
* the four path elements that contain only {@code 0} or {@code 1}.
*
* <p>Example: {@code
* s3://bucket1/warehouse/1000/1000/1110/10001000/newdb/table_949afb2c-ed93-4702-b390-f1d4a9c59957/my-data-file.txt}
*/
private static final Pattern NEW_OBJECT_STORAGE_LAYOUT =
Pattern.compile("[01]{4}/[01]{4}/[01]{4}/[01]{8}/(.*)");

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergS3SignParams.class);

abstract IcebergS3SignRequest request();
Expand Down Expand Up @@ -117,33 +130,49 @@ Uni<IcebergS3SignResponse> verifyAndSign() {
}

private boolean checkLocation(String location) {
String requested = requestedS3Uri();
return checkLocation(warehouseLocation(), requestedS3Uri(), location);
}

@VisibleForTesting
static boolean checkLocation(String warehouseLocation, String requested, String location) {
if (requested.startsWith(location)) {
return true;
}

// For files that were written with 'write.object-storage.enabled' enabled, repeat the check but
// ignore the first S3 path element after the warehouse location

String warehouseLocation = warehouseLocation();
if (warehouseLocation.isEmpty()) {
int warehouseLocationLength = warehouseLocation.length();
if (warehouseLocationLength == 0) {
return false;
}

if (!requested.startsWith(warehouseLocation)) {
return false;
if (!warehouseLocation.endsWith("/")) {
warehouseLocation += "/";
warehouseLocationLength++;
}
if (!location.startsWith(warehouseLocation)) {
return false;
if (!location.endsWith("/")) {
location += "/";
}

int requestedSlash = requested.indexOf('/', warehouseLocation.length() + 1);
if (requestedSlash == -1) {
if (!requested.startsWith(warehouseLocation) || !location.startsWith(warehouseLocation)) {
return false;
}
String requestedPath = requested.substring(requestedSlash);
String locationPath = location.substring(warehouseLocation.length());

String requestedPath = requested.substring(warehouseLocationLength);

Matcher newObjectStorageLayoutMatcher = NEW_OBJECT_STORAGE_LAYOUT.matcher(requestedPath);
if (newObjectStorageLayoutMatcher.find()) {
requestedPath = newObjectStorageLayoutMatcher.group(1);
} else {
int requestedSlash = requestedPath.indexOf('/');
if (requestedSlash == -1) {
return false;
}
requestedPath = requestedPath.substring(requestedSlash + 1);
}

String locationPath = location.substring(warehouseLocationLength);
return requestedPath.startsWith(locationPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.projectnessie.catalog.service.rest;

import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
Expand All @@ -31,9 +32,15 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -62,21 +69,37 @@
import org.projectnessie.model.Reference.ReferenceType;
import org.projectnessie.versioned.RequestMeta;

@ExtendWith(MockitoExtension.class)
@ExtendWith({MockitoExtension.class, SoftAssertionsExtension.class})
class TestIcebergS3SignParams {

@InjectSoftAssertions SoftAssertions soft;
@Mock CatalogService catalogService;
@Mock RequestSigner signer;

final String warehouseLocation = "s3://bucket/warehouse/";
final String baseLocation = warehouseLocation + "ns/table1_cafebabe";
final String oldBaseLocation = "s3://old-bucket/ns/table1";
final String metadataLocation = baseLocation + "/metadata/metadata.json";
final String dataFileUri =
"https://bucket.s3.amazonaws.com/warehouse/ns/table1_cafebabe/data/file1.parquet";
final String oldDataFileUri = "https://old-bucket.s3.amazonaws.com/ns/table1/data/file1.parquet";
final String metadataJsonUri =
"https://bucket.s3.amazonaws.com/warehouse/ns/table1_cafebabe/metadata/metadata.json";
static final String warehouseLocation = "s3://bucket/warehouse/";
static final String locationPart = "ns/table1_cafebabe";

// Before Iceberg 1.7.0
static final String objectStoragePartOld = "iI5Yww";
// Since Iceberg 1.7.0
static final String objectStoragePartNew = "1000/1000/1110/10001000";

static final String baseLocation = warehouseLocation + locationPart;
static final String oldBaseLocation = "s3://old-bucket/ns/table1";
static final String metadataLocation = baseLocation + "/metadata/metadata.json";

static final String s3BucketAws = "https://bucket.s3.amazonaws.com/warehouse/";
static final String dataFileUri = s3BucketAws + locationPart + "/data/file1.parquet";
static final String dataFileUriObjectStorageOld =
s3BucketAws + objectStoragePartOld + "/" + locationPart + "/data/file1.parquet";
static final String dataFileUriObjectStorageNew =
s3BucketAws + objectStoragePartNew + "/" + locationPart + "/data/file1.parquet";

static final String oldDataFileUri =
"https://old-bucket.s3.amazonaws.com/ns/table1/data/file1.parquet";

static final String metadataJsonUri = s3BucketAws + locationPart + "/metadata/metadata.json";

final IcebergS3SignRequest writeRequest =
IcebergS3SignRequest.builder()
.method("DELETE")
Expand Down Expand Up @@ -123,31 +146,110 @@ class TestIcebergS3SignParams {
final SigningResponse signingResponse =
ImmutableSigningResponse.builder().uri(URI.create(dataFileUri)).build();

static Stream<Arguments> readMethodsAndUris() {
return Stream.of("GET", "HEAD", "OPTIONS", "TRACE").flatMap(TestIcebergS3SignParams::addUris);
}

static Stream<Arguments> writeMethodsAndUris() {
return Stream.of("PUT", "POST", "DELETE", "PATCH").flatMap(TestIcebergS3SignParams::addUris);
}

static Stream<Arguments> addUris(String method) {
return Stream.of(
arguments(method, dataFileUri),
arguments(method, dataFileUriObjectStorageOld),
arguments(method, dataFileUriObjectStorageNew));
}

@Test
void checkLocation() {
// Strip trailing '/'
String warehouseLocation = TestIcebergS3SignParams.warehouseLocation;
warehouseLocation = warehouseLocation.substring(0, warehouseLocation.length() - 1);

String requestedPlain = warehouseLocation + '/' + locationPart + "/data/file1.parquet";
String requestedObjectStorageOld =
warehouseLocation + '/' + objectStoragePartOld + '/' + locationPart + "/data/file1.parquet";
String requestedObjectStorageNew =
warehouseLocation + '/' + objectStoragePartNew + '/' + locationPart + "/data/file1.parquet";

String location = warehouseLocation + '/' + locationPart;

soft.assertThat(IcebergS3SignParams.checkLocation(warehouseLocation, requestedPlain, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(warehouseLocation, requestedPlain, location + '/'))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(warehouseLocation + '/', requestedPlain, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation + '/', requestedPlain, location + '/'))
.isTrue();

soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation, requestedObjectStorageOld, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation, requestedObjectStorageOld, location + '/'))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation + '/', requestedObjectStorageOld, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation + '/', requestedObjectStorageOld, location + '/'))
.isTrue();

soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation, requestedObjectStorageNew, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation, requestedObjectStorageNew, location + '/'))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation + '/', requestedObjectStorageNew, location))
.isTrue();
soft.assertThat(
IcebergS3SignParams.checkLocation(
warehouseLocation + '/', requestedObjectStorageNew, location + '/'))
.isTrue();
}

@ParameterizedTest
@ValueSource(strings = {"GET", "HEAD", "OPTIONS", "TRACE"})
void verifyAndSignSuccessRead(String method) throws Exception {
@MethodSource("readMethodsAndUris")
void verifyAndSignSuccessRead(String method, String uri) throws Exception {
when(catalogService.retrieveSnapshot(
any(), eq(key), isNull(), eq(expectedApiRead(key)), eq(ICEBERG_V1)))
.thenReturn(successStage);
when(signer.sign(any())).thenReturn(signingResponse);
IcebergS3SignParams icebergSigner =
newBuilder()
.request(IcebergS3SignRequest.builder().from(readRequest).method(method).build())
.request(
IcebergS3SignRequest.builder().from(readRequest).uri(uri).method(method).build())
.build();
Uni<IcebergS3SignResponse> response = icebergSigner.verifyAndSign();
expectSuccess(response);
}

@ParameterizedTest
@ValueSource(strings = {"PUT", "POST", "DELETE", "PATCH"})
void verifyAndSignSuccessWrite(String method) throws Exception {
@MethodSource("writeMethodsAndUris")
void verifyAndSignSuccessWrite(String method, String uri) throws Exception {
when(catalogService.retrieveSnapshot(
any(), eq(key), isNull(), eq(expectedApiWrite(key)), eq(ICEBERG_V1)))
.thenReturn(successStage);
when(signer.sign(any())).thenReturn(signingResponse);
IcebergS3SignParams icebergSigner =
newBuilder()
.request(IcebergS3SignRequest.builder().from(writeRequest).method(method).build())
.request(
IcebergS3SignRequest.builder().from(writeRequest).uri(uri).method(method).build())
.build();
Uni<IcebergS3SignResponse> response = icebergSigner.verifyAndSign();
expectSuccess(response);
Expand Down Expand Up @@ -299,7 +401,7 @@ void verifyAndSignFailureWrongBaseLocation() throws Exception {
.thenReturn(successStage);
IcebergS3SignParams icebergSigner =
newBuilder()
.writeLocations(List.of("s3://wrong-bucket/warehouse/ns/table1_cafebabee"))
.writeLocations(List.of("s3://wrong-bucket/warehouse/" + locationPart + "e"))
.build();
Uni<IcebergS3SignResponse> response = icebergSigner.verifyAndSign();
expectFailure(response, "URI not allowed for signing: " + dataFileUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import io.quarkus.test.junit.QuarkusTestProfile;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
Expand Down Expand Up @@ -133,9 +135,26 @@ public void testTableWithObjectStorage() throws Exception {

assertThat(dataLocation).startsWith(writeDataPath + "/");
String path = dataLocation.substring(writeDataPath.length() + 1);
int idx = path.indexOf('/');
assertThat(idx).isGreaterThan(1);
String pathNoRandom = path.substring(idx + 1);
// Before Iceberg 1.7.0:
// dataLocation ==
// s3://bucket1/warehouse/iI5Yww/newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// path == iI5Yww/newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// pathNoRandom == newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// Since Iceberg 1.7.0:
// dataLocation ==
// s3://bucket1/warehouse/1000/1000/1110/10001000/newdb/table_949afb2c-ed93-4702-b390-f1d4a9c59957/my-data-file.txt
// path ==
// 1000/1000/1110/10001000/newdb/table_949afb2c-ed93-4702-b390-f1d4a9c59957/my-data-file.txt
Pattern patternNewObjectStorageLayout = Pattern.compile("[01]{4}/[01]{4}/[01]{4}/[01]{8}/(.*)");
Matcher matcherNewObjectStorageLayout = patternNewObjectStorageLayout.matcher(path);
String pathNoRandom;
if (matcherNewObjectStorageLayout.find()) {
pathNoRandom = matcherNewObjectStorageLayout.group(1);
} else {
int idx = path.indexOf('/');
assertThat(idx).isGreaterThan(1);
pathNoRandom = path.substring(idx + 1);
}
assertThat(pathNoRandom)
.startsWith(namespace + '/' + tableId.name() + '_')
.endsWith('/' + filename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
Expand Down Expand Up @@ -491,9 +493,26 @@ public void testTableWithObjectStorage() throws Exception {

assertThat(dataLocation).startsWith(writeDataPath + "/");
String path = dataLocation.substring(writeDataPath.length() + 1);
int idx = path.indexOf('/');
assertThat(idx).isGreaterThan(1);
String pathNoRandom = path.substring(idx + 1);
// Before Iceberg 1.7.0:
// dataLocation ==
// s3://bucket1/warehouse/iI5Yww/newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// path == iI5Yww/newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// pathNoRandom == newdb/table_5256a122-69b3-4ec2-a6ce-f98f9ce509bf/my-data-file.txt
// Since Iceberg 1.7.0:
// dataLocation ==
// s3://bucket1/warehouse/1000/1000/1110/10001000/newdb/table_949afb2c-ed93-4702-b390-f1d4a9c59957/my-data-file.txt
// path ==
// 1000/1000/1110/10001000/newdb/table_949afb2c-ed93-4702-b390-f1d4a9c59957/my-data-file.txt
Pattern patternNewObjectStorageLayout = Pattern.compile("[01]{4}/[01]{4}/[01]{4}/[01]{8}/(.*)");
Matcher matcherNewObjectStorageLayout = patternNewObjectStorageLayout.matcher(path);
String pathNoRandom;
if (matcherNewObjectStorageLayout.find()) {
pathNoRandom = matcherNewObjectStorageLayout.group(1);
} else {
int idx = path.indexOf('/');
assertThat(idx).isGreaterThan(1);
pathNoRandom = path.substring(idx + 1);
}
assertThat(pathNoRandom)
.startsWith(String.join("/", TABLE.namespace().levels()) + '/' + TABLE.name() + '_')
.endsWith('/' + filename);
Expand Down

0 comments on commit 912bdea

Please sign in to comment.