Skip to content

Commit

Permalink
Merge branch 'main' into feat-quite-command
Browse files Browse the repository at this point in the history
  • Loading branch information
Abyss-lord authored Jan 16, 2025
2 parents f43088f + e6225a0 commit 3029125
Show file tree
Hide file tree
Showing 114 changed files with 5,117 additions and 717 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/gvfs-fuse-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ jobs:
run: |
dev/ci/check_commands.sh
- name: Build and test Gravitino
- name: Build Gvfs-fuse
run: |
./gradlew :clients:filesystem-fuse:build -PenableFuse=true
- name: Integration test
run: |
./gradlew build -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
./gradlew compileDistribution -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
cd clients/filesystem-fuse
make test-s3
make test-fuse-it
- name: Free up disk space
run: |
dev/ci/util_free_space.sh
Expand All @@ -85,5 +93,7 @@ jobs:
with:
name: Gvfs-fuse integrate-test-reports-${{ matrix.java-version }}
path: |
clients/filesystem-fuse/build/test/log/*.log
clients/filesystem-fuse/target/debug/fuse.log
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@
Apache Arrow
Rome
Jettison
Awaitility

This product bundles various third-party components also under the
Apache Software Foundation License 1.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ public RangerClientExtension(String hostName, String authType, String username,
}
}

@Override
public RangerPolicy createPolicy(RangerPolicy policy) throws RangerServiceException {
Preconditions.checkArgument(
policy.getResources().size() > 0, "Ranger policy resources can not be empty!");
return super.createPolicy(policy);
}

@Override
public RangerPolicy updatePolicy(long policyId, RangerPolicy policy)
throws RangerServiceException {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public VXGroup() {
*
* @return formatedStr
*/
@Override
public String toString() {
String str = "VXGroup={";
str += super.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public String getName() {
*
* @return formatedStr
*/
@Override
public String toString() {
String str = "VXUser={";
str += super.toString();
Expand Down
9 changes: 9 additions & 0 deletions bin/common.sh → bin/common.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ if [[ -f "${GRAVITINO_CONF_DIR}/gravitino-env.sh" ]]; then
. "${GRAVITINO_CONF_DIR}/gravitino-env.sh"
fi

if [[ -z "${GRAVITINO_VERSION}" ]]; then
echo -e "GRAVITINO_VERSION is not set, you may need to:\n" \
"1. Ensure that a compiled version of Gravitino is available at " \
"\${GRAVITINO_HOME}/distribution/package. You may need to compile it first, " \
"if you are installing the software from source code.\n" \
"2. Execute gravitino.sh in the \${GRAVITINO_HOME}/distribution/package/bin directory."
exit 1
fi

GRAVITINO_CLASSPATH+=":${GRAVITINO_CONF_DIR}"

JVM_VERSION=8
Expand Down
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
exclude("org.fusesource.leveldbjni")
}
implementation(libs.slf4j.api)
implementation(libs.awaitility)

compileOnly(libs.guava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
Expand Down Expand Up @@ -71,6 +73,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -755,6 +759,35 @@ FileSystem getFileSystem(Path path, Map<String, String> config) throws IOExcepti
scheme, path, fileSystemProvidersMap.keySet(), fileSystemProvidersMap.values()));
}

return provider.getFileSystem(path, config);
int timeoutSeconds =
(int)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(
config, HadoopCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
try {
AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
Awaitility.await()
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.until(
() -> {
fileSystem.set(provider.getFileSystem(path, config));
return true;
});
return fileSystem.get();
} catch (ConditionTimeoutException e) {
throw new IOException(
String.format(
"Failed to get FileSystem for path: %s, scheme: %s, provider: %s, config: %s within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog property: %s",
path,
scheme,
provider,
config,
timeoutSeconds,
HadoopCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
*/
public static final String DEFAULT_FS_PROVIDER = "default-filesystem-provider";

static final String FILESYSTEM_CONNECTION_TIMEOUT_SECONDS = "filesystem-conn-timeout-secs";
static final int DEFAULT_GET_FILESYSTEM_TIMEOUT_SECONDS = 6;

public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";

Expand Down Expand Up @@ -82,6 +85,14 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
false /* immutable */,
BUILTIN_LOCAL_FS_PROVIDER, // please see LocalFileSystemProvider#name()
false /* hidden */))
.put(
FILESYSTEM_CONNECTION_TIMEOUT_SECONDS,
PropertyEntry.integerOptionalPropertyEntry(
FILESYSTEM_CONNECTION_TIMEOUT_SECONDS,
"Timeout to wait for to create the Hadoop file system client instance.",
false /* immutable */,
DEFAULT_GET_FILESYSTEM_TIMEOUT_SECONDS,
false /* hidden */))
// The following two are about authentication.
.putAll(KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected Map<String, String> getTableProperties(Connection connection, String t
}
}

@Override
protected void correctJdbcTableFields(
Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
Expand Down Expand Up @@ -513,6 +514,13 @@ public Table createTable(
.build())
.toArray(IcebergColumn[]::new);

// Gravitino NONE distribution means the client side doesn't specify distribution, which is
// not the same as none distribution in Iceberg.
if (Distributions.NONE.equals(distribution)) {
distribution =
getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0);
}

IcebergTable createdTable =
IcebergTable.builder()
.withName(tableIdent.name())
Expand Down Expand Up @@ -588,6 +596,16 @@ public void testConnection(
}
}

private static Distribution getIcebergDefaultDistribution(
boolean isSorted, boolean isPartitioned) {
if (isSorted) {
return Distributions.RANGE;
} else if (isPartitioned) {
return Distributions.HASH;
}
return Distributions.NONE;
}

private static String currentUser() {
return PrincipalUtils.getCurrentUserName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,6 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
Schema schema = table.schema();
Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder());
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
distribution = Distributions.HASH;
break;
case RANGE:
distribution = Distributions.RANGE;
break;
default:
// do nothing
break;
}
}
IcebergColumn[] icebergColumns =
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
return IcebergTable.builder()
Expand All @@ -178,7 +163,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
.withAuditInfo(AuditInfo.EMPTY)
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
.withDistribution(distribution)
.withDistribution(getDistribution(properties))
.build();
}

Expand Down Expand Up @@ -236,4 +221,23 @@ protected IcebergTable internalBuild() {
public static Builder builder() {
return new Builder();
}

private static Distribution getDistribution(Map<String, String> properties) {
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
distribution = Distributions.HASH;
break;
case RANGE:
distribution = Distributions.RANGE;
break;
default:
// do nothing
break;
}
}
return distribution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,76 @@ void testCreateTableWithNullComment() {
Assertions.assertNull(loadTable.comment());
}

@Test
void testCreateTableWithNoneDistribution() {
// Create table from Gravitino API
Column[] columns = createColumns();

NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Distribution distribution = Distributions.NONE;

final SortOrder[] sortOrders =
new SortOrder[] {
SortOrders.of(
NamedReference.field(ICEBERG_COL_NAME2),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};

Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())};
Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
Table tableWithPartitionAndSortorder =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name());
Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution());

Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.RANGE, loadTable.distribution());
tableCatalog.dropTable(tableIdentifier);

Table tableWithPartition =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
new SortOrder[0]);
Assertions.assertEquals(tableName, tableWithPartition.name());
Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution());

loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.HASH, loadTable.distribution());
tableCatalog.dropTable(tableIdentifier);

Table tableWithoutPartitionAndSortOrder =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
new Transform[0],
distribution,
new SortOrder[0]);
Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name());
Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution());

loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.NONE, loadTable.distribution());
}

@Test
void testCreateAndLoadIcebergTable() {
// Create table from Gravitino API
Expand Down Expand Up @@ -968,9 +1038,9 @@ public void testTableDistribution() {
columns,
table_comment,
properties,
partitioning,
new Transform[0],
distribution,
sortOrders);
new SortOrder[0]);

Table loadTable = tableCatalog.loadTable(tableIdentifier);

Expand All @@ -981,8 +1051,8 @@ public void testTableDistribution() {
Arrays.asList(columns),
properties,
distribution,
sortOrders,
partitioning,
new SortOrder[0],
new Transform[0],
loadTable);

Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier));
Expand Down Expand Up @@ -1179,7 +1249,7 @@ public void testTableSortOrder() {
Column[] columns = createColumns();

NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Distribution distribution = Distributions.NONE;
Distribution distribution = Distributions.HASH;

final SortOrder[] sortOrders =
new SortOrder[] {
Expand Down
Loading

0 comments on commit 3029125

Please sign in to comment.