Skip to content

Commit

Permalink
CR Fixes 3
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Mar 3, 2024
1 parent ca5d9f8 commit e7c8d46
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 38 deletions.
29 changes: 13 additions & 16 deletions src/main/java/io/lakefs/iceberg/catalog/LakeFSCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public class LakeFSCatalog extends BaseMetastoreCatalog implements SupportsNames
path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
private static final String HADOOP_SUPPRESS_PERMISSION_ERROR = "suppress-permission-error";

public static final String WAREHOUSE_LOCATION = "lakefs://";

private String catalogName;
private Configuration conf;
private String warehouseLocation;
Expand All @@ -63,15 +61,14 @@ public class LakeFSCatalog extends BaseMetastoreCatalog implements SupportsNames
@Override
public void initialize(String name, Map<String, String> properties) {
catalogProperties = ImmutableMap.copyOf(properties);
String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
Preconditions.checkArgument(
properties.get(CatalogProperties.WAREHOUSE_LOCATION) == null,
String.format("Unsupported configuration: LakeFSCatalog does not support the property: %s",
CatalogProperties.WAREHOUSE_LOCATION));
inputWarehouseLocation != null && !inputWarehouseLocation.isEmpty(),
"Cannot initialize LakeFSCatalog because warehousePath must not be null or empty");

catalogName = name;
warehouseLocation = WAREHOUSE_LOCATION;
warehouseLocation = inputWarehouseLocation;
suppressPermissionError = Boolean.parseBoolean(properties.get(HADOOP_SUPPRESS_PERMISSION_ERROR));
// TODO (niro): Future - create a caching mechanism for FileSystem Initialization per repo
}

@Override
Expand Down Expand Up @@ -128,7 +125,7 @@ public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(
namespace.levels().length > 1, "Missing database in table identifier: %s", namespace);

String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace));
Set<TableIdentifier> tblIdents = Sets.newHashSet();
try {
Path nsPath = new Path(new URI(location));
Expand Down Expand Up @@ -170,9 +167,9 @@ protected TableOperations newTableOps(TableIdentifier identifier) {
final String[] levels = identifier.namespace().levels();
Preconditions.checkArgument(levels.length > 2, String.format("Missing database in table identifier: %s", identifier));
Configuration conf = getConf();
LakeFSFileIO fileIO = new LakeFSFileIO(levels[0], levels[1], conf);
LakeFSFileIO fileIO = new LakeFSFileIO(warehouseLocation, levels[0], levels[1], conf);
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier));
return new LakeFSTableOperations(new Path(location), fileIO, conf);
return new LakeFSTableOperations(new Path(location), fileIO, warehouseLocation, conf);
}

@Override
Expand All @@ -191,7 +188,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
throw new NoSuchTableException("Invalid identifier: %s", identifier);
}

String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(identifier));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier));
Path tablePath;
try {
tablePath = new Path(new URI(location));
Expand Down Expand Up @@ -226,7 +223,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
!namespace.isEmpty(), "Cannot create namespace with invalid name: %s", namespace);
String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace));
Path metadataPath = new Path(location + "/" + NAMESPACE_FILENAME);
Path nsPath;
try {
Expand Down Expand Up @@ -261,7 +258,7 @@ public List<Namespace> listNamespaces(Namespace namespace) {
throw new NoSuchNamespaceException("Namespace must contain at least repository and branch levels: %s", namespace);
}

String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace));
Path nsPath;
try {
nsPath = new Path(new URI(location));
Expand Down Expand Up @@ -298,7 +295,7 @@ private Namespace append(Namespace ns, String name) {

@Override
public boolean dropNamespace(Namespace namespace) {
String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace));
// This method of getting the path removes the last slash so that the namespace directory is removed
Path nsPath = new Path(location);

Expand Down Expand Up @@ -336,7 +333,7 @@ public boolean removeProperties(Namespace namespace, Set<String> properties) {
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
Map<String,String> result = new HashMap<>();
String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace));
String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace));
Path nsPath = new Path(location);
if (!isNamespace(nsPath) || namespace.isEmpty()) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
Expand Down Expand Up @@ -396,7 +393,7 @@ private class LakeFSCatalogTableBuilder extends BaseMetastoreCatalogTableBuilder

private LakeFSCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
super(identifier, schema);
defaultLocation = Util.getPathFromURL(String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(identifier)));
defaultLocation = Util.getPathFromURL(String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier)));
super.withLocation(defaultLocation);
}

Expand Down
15 changes: 4 additions & 11 deletions src/main/java/io/lakefs/iceberg/catalog/LakeFSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
*/
public class LakeFSFileIO extends HadoopFileIO {

private transient Configuration conf; // transient - to avoid Spark serialization error
private String basePath;

@SuppressWarnings("unused")
public LakeFSFileIO() {
}
private final transient Configuration conf; // transient - to avoid Spark serialization error
private final String basePath;

public LakeFSFileIO(String lakeFSRepo, String lakeFSRef, Configuration conf) {
public LakeFSFileIO(String warehouse, String lakeFSRepo, String lakeFSRef, Configuration conf) {
super(conf);
this.conf = conf;
this.basePath = String.format("%s%s/%s", LakeFSCatalog.WAREHOUSE_LOCATION, lakeFSRepo, lakeFSRef);
this.basePath = String.format("%s%s/%s", warehouse, lakeFSRepo, lakeFSRef);

}

Expand Down Expand Up @@ -59,9 +55,6 @@ public OutputFile newOutputFile(String path) {
private static class LakeFSPath extends Path {
public LakeFSPath(String pathString) throws IllegalArgumentException {
super(pathString);
if (!pathString.startsWith("lakefs://")) {
throw new IllegalArgumentException("Expecting a valid lakefs URI");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.lakefs.iceberg.catalog;

import io.lakefs.LakeFSFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -38,7 +37,8 @@ public class LakeFSTableOperations extends HadoopTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(LakeFSTableOperations.class);
private static final Pattern VERSION_PATTERN = Pattern.compile("v([^.]*)\\..*");
private final Path location;
private final LakeFSFileSystem fs;
private final String warehouse;
private final FileSystem fs;
private final FileIO fileIO;

// Ensure reading currentMetadata will provide the last value written
Expand All @@ -48,13 +48,13 @@ public class LakeFSTableOperations extends HadoopTableOperations {
// volatile parameter to ensure that if it was set to true, it will be read appropriately
private volatile boolean shouldRefresh = true;

public LakeFSTableOperations(Path location, FileIO fileIO, Configuration conf) {
public LakeFSTableOperations(Path location, FileIO fileIO, String warehouse, Configuration conf) {
super(location, fileIO, conf, LockManagers.defaultLockManager());
this.fileIO = fileIO;
this.location = location;
this.fs = new LakeFSFileSystem();
this.warehouse = warehouse;
try {
fs.initialize(location.toUri(), conf);
this.fs = location.getFileSystem(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -69,7 +69,7 @@ public FileIO io() {
@Override
public String metadataFileLocation(String fileName) {
String path = super.metadataFileLocation(fileName);
if (path.startsWith(LakeFSCatalog.WAREHOUSE_LOCATION)) {
if (path.startsWith(warehouse)) {
path = Util.getPathFromURL(path);
}
return path;
Expand Down Expand Up @@ -125,10 +125,10 @@ public void commit(TableMetadata base, TableMetadata metadata) {

Preconditions.checkArgument(
base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated");
"lakeFS path-based tables cannot be relocated");
Preconditions.checkArgument(
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");
"lakeFS path-based tables cannot relocate metadata");

String codecName =
metadata.property(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testLakeFSWithSpark() throws AnalysisException, IOException, ApiExce
SparkConf conf = new SparkConf();
conf.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog");
conf.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog");
conf.set("spark.sql.catalog.lakefs.warehouse", "lakefs://");
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");

conf.set("spark.hadoop.fs.s3a.access.key", awsAccessKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void setUp(){
.preSignSupportUi(false)
.importSupport(false)
.importValidityRegex(".*"))));
lakeFSFileIO = new LakeFSFileIO(lakeFSRepo, lakeFSRef, conf);
lakeFSFileIO = new LakeFSFileIO("lakefs://", lakeFSRepo, lakeFSRef, conf);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void setUp() throws IOException, ApiException {
baseUrl = String.format("lakefs://%s/%s", lakeFSRepo, lakeFSRef);
repoCreation.setStorageNamespace(String.format("%s/%s", storageNamespace, repoCreation.getName()));
lfsClient.getRepositoriesApi().createRepository(repoCreation).execute();
lakeFSFileIO = new LakeFSFileIO(lakeFSRepo, lakeFSRef, conf);
lakeFSFileIO = new LakeFSFileIO("lakefs://", lakeFSRepo, lakeFSRef, conf);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def spark():
spark_config = SparkConf()
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false")
spark_config.set("spark.sql.catalog.lakefs.warehouse", "lakefs://")
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_config.set("spark.hadoop.fs.s3a.access.key", access_key)
spark_config.set("spark.hadoop.fs.s3a.secret.key", secret_key)
Expand Down

0 comments on commit e7c8d46

Please sign in to comment.