From e7c8d46acbef1ff7c1e63b11a12ef15d9729c788 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Sun, 3 Mar 2024 18:38:33 +0200 Subject: [PATCH] CR Fixes 3 --- .../lakefs/iceberg/catalog/LakeFSCatalog.java | 29 +++++++++---------- .../lakefs/iceberg/catalog/LakeFSFileIO.java | 15 +++------- .../catalog/LakeFSTableOperations.java | 16 +++++----- .../catalog/TestLakeFSCatalogSpark.java | 1 + .../iceberg/catalog/TestLakeFSFileIO.java | 2 +- .../catalog/TestLakeFSTableOperations.java | 2 +- tests/conftest.py | 2 +- 7 files changed, 29 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/lakefs/iceberg/catalog/LakeFSCatalog.java b/src/main/java/io/lakefs/iceberg/catalog/LakeFSCatalog.java index 81ff1bd..124f254 100644 --- a/src/main/java/io/lakefs/iceberg/catalog/LakeFSCatalog.java +++ b/src/main/java/io/lakefs/iceberg/catalog/LakeFSCatalog.java @@ -52,8 +52,6 @@ public class LakeFSCatalog extends BaseMetastoreCatalog implements SupportsNames path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION); private static final String HADOOP_SUPPRESS_PERMISSION_ERROR = "suppress-permission-error"; - public static final String WAREHOUSE_LOCATION = "lakefs://"; - private String catalogName; private Configuration conf; private String warehouseLocation; @@ -63,15 +61,14 @@ public class LakeFSCatalog extends BaseMetastoreCatalog implements SupportsNames @Override public void initialize(String name, Map properties) { catalogProperties = ImmutableMap.copyOf(properties); + String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION); Preconditions.checkArgument( - properties.get(CatalogProperties.WAREHOUSE_LOCATION) == null, - String.format("Unsupported configuration: LakeFSCatalog does not support the property: %s", - CatalogProperties.WAREHOUSE_LOCATION)); + inputWarehouseLocation != null && !inputWarehouseLocation.isEmpty(), + "Cannot initialize LakeFSCatalog because warehousePath must not be null or empty"); catalogName = name; - warehouseLocation = WAREHOUSE_LOCATION; + warehouseLocation = inputWarehouseLocation; suppressPermissionError = Boolean.parseBoolean(properties.get(HADOOP_SUPPRESS_PERMISSION_ERROR)); - // TODO (niro): Future - create a caching mechanism for FileSystem Initialization per repo } @Override @@ -128,7 +125,7 @@ public List listTables(Namespace namespace) { Preconditions.checkArgument( namespace.levels().length > 1, "Missing database in table identifier: %s", namespace); - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace)); Set tblIdents = Sets.newHashSet(); try { Path nsPath = new Path(new URI(location)); @@ -170,9 +167,9 @@ protected TableOperations newTableOps(TableIdentifier identifier) { final String[] levels = identifier.namespace().levels(); Preconditions.checkArgument(levels.length > 2, String.format("Missing database in table identifier: %s", identifier)); Configuration conf = getConf(); - LakeFSFileIO fileIO = new LakeFSFileIO(levels[0], levels[1], conf); + LakeFSFileIO fileIO = new LakeFSFileIO(warehouseLocation, levels[0], levels[1], conf); String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier)); - return new LakeFSTableOperations(new Path(location), fileIO, conf); + return new LakeFSTableOperations(new Path(location), fileIO, warehouseLocation, conf); } @Override @@ -191,7 +188,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { throw new NoSuchTableException("Invalid identifier: %s", identifier); } - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(identifier)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier)); Path tablePath; try { tablePath = new Path(new URI(location)); @@ -226,7 +223,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( !namespace.isEmpty(), "Cannot create namespace with invalid name: %s", namespace); - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace)); Path metadataPath = new Path(location + "/" + NAMESPACE_FILENAME); Path nsPath; try { @@ -261,7 +258,7 @@ public List listNamespaces(Namespace namespace) { throw new NoSuchNamespaceException("Namespace must contain at least repository and branch levels: %s", namespace); } - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace)); Path nsPath; try { nsPath = new Path(new URI(location)); @@ -298,7 +295,7 @@ private Namespace append(Namespace ns, String name) { @Override public boolean dropNamespace(Namespace namespace) { - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace)); // This method of getting the path removes the last slash so that the namespace directory is removed Path nsPath = new Path(location); @@ -336,7 +333,7 @@ public boolean removeProperties(Namespace namespace, Set properties) { @Override public Map loadNamespaceMetadata(Namespace namespace) { Map result = new HashMap<>(); - String location = String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(namespace)); + String location = String.format("%s%s", warehouseLocation, defaultWarehouseLocation(namespace)); Path nsPath = new Path(location); if (!isNamespace(nsPath) || namespace.isEmpty()) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); @@ -396,7 +393,7 @@ private class LakeFSCatalogTableBuilder extends BaseMetastoreCatalogTableBuilder private LakeFSCatalogTableBuilder(TableIdentifier identifier, Schema schema) { super(identifier, schema); - defaultLocation = Util.getPathFromURL(String.format("%s%s", WAREHOUSE_LOCATION, defaultWarehouseLocation(identifier))); + defaultLocation = Util.getPathFromURL(String.format("%s%s", warehouseLocation, defaultWarehouseLocation(identifier))); super.withLocation(defaultLocation); } diff --git a/src/main/java/io/lakefs/iceberg/catalog/LakeFSFileIO.java b/src/main/java/io/lakefs/iceberg/catalog/LakeFSFileIO.java index a107ad5..8d80be0 100644 --- a/src/main/java/io/lakefs/iceberg/catalog/LakeFSFileIO.java +++ b/src/main/java/io/lakefs/iceberg/catalog/LakeFSFileIO.java @@ -15,17 +15,13 @@ */ public class LakeFSFileIO extends HadoopFileIO { - private transient Configuration conf; // transient - to avoid Spark serialization error - private String basePath; - - @SuppressWarnings("unused") - public LakeFSFileIO() { - } + private final transient Configuration conf; // transient - to avoid Spark serialization error + private final String basePath; - public LakeFSFileIO(String lakeFSRepo, String lakeFSRef, Configuration conf) { + public LakeFSFileIO(String warehouse, String lakeFSRepo, String lakeFSRef, Configuration conf) { super(conf); this.conf = conf; - this.basePath = String.format("%s%s/%s", LakeFSCatalog.WAREHOUSE_LOCATION, lakeFSRepo, lakeFSRef); + this.basePath = String.format("%s%s/%s", warehouse, lakeFSRepo, lakeFSRef); } @@ -59,9 +55,6 @@ public OutputFile newOutputFile(String path) { private static class LakeFSPath extends Path { public LakeFSPath(String pathString) throws IllegalArgumentException { super(pathString); - if (!pathString.startsWith("lakefs://")) { - throw new IllegalArgumentException("Expecting a valid lakefs URI"); - } } /** diff --git a/src/main/java/io/lakefs/iceberg/catalog/LakeFSTableOperations.java b/src/main/java/io/lakefs/iceberg/catalog/LakeFSTableOperations.java index 4748003..14b6a9a 100644 --- a/src/main/java/io/lakefs/iceberg/catalog/LakeFSTableOperations.java +++ b/src/main/java/io/lakefs/iceberg/catalog/LakeFSTableOperations.java @@ -1,6 +1,5 @@ package io.lakefs.iceberg.catalog; -import io.lakefs.LakeFSFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -38,7 +37,8 @@ public class LakeFSTableOperations extends HadoopTableOperations { private static final Logger LOG = LoggerFactory.getLogger(LakeFSTableOperations.class); private static final Pattern VERSION_PATTERN = Pattern.compile("v([^.]*)\\..*"); private final Path location; - private final LakeFSFileSystem fs; + private final String warehouse; + private final FileSystem fs; private final FileIO fileIO; // Ensure reading currentMetadata will provide the last value written @@ -48,13 +48,13 @@ public class LakeFSTableOperations extends HadoopTableOperations { // volatile parameter to ensure that if it was set to true, it will be read appropriately private volatile boolean shouldRefresh = true; - public LakeFSTableOperations(Path location, FileIO fileIO, Configuration conf) { + public LakeFSTableOperations(Path location, FileIO fileIO, String warehouse, Configuration conf) { super(location, fileIO, conf, LockManagers.defaultLockManager()); this.fileIO = fileIO; this.location = location; - this.fs = new LakeFSFileSystem(); + this.warehouse = warehouse; try { - fs.initialize(location.toUri(), conf); + this.fs = location.getFileSystem(conf); } catch (IOException e) { throw new RuntimeException(e); } @@ -69,7 +69,7 @@ public FileIO io() { @Override public String metadataFileLocation(String fileName) { String path = super.metadataFileLocation(fileName); - if (path.startsWith(LakeFSCatalog.WAREHOUSE_LOCATION)) { + if (path.startsWith(warehouse)) { path = Util.getPathFromURL(path); } return path; @@ -125,10 +125,10 @@ public void commit(TableMetadata base, TableMetadata metadata) { Preconditions.checkArgument( base == null || base.location().equals(metadata.location()), - "Hadoop path-based tables cannot be relocated"); + "lakeFS path-based tables cannot be relocated"); Preconditions.checkArgument( !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION), - "Hadoop path-based tables cannot relocate metadata"); + "lakeFS path-based tables cannot relocate metadata"); String codecName = metadata.property( diff --git a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSCatalogSpark.java b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSCatalogSpark.java index 7834c6f..d25c490 100644 --- a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSCatalogSpark.java +++ b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSCatalogSpark.java @@ -44,6 +44,7 @@ public void testLakeFSWithSpark() throws AnalysisException, IOException, ApiExce SparkConf conf = new SparkConf(); conf.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog"); conf.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog"); + conf.set("spark.sql.catalog.lakefs.warehouse", "lakefs://"); conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); conf.set("spark.hadoop.fs.s3a.access.key", awsAccessKey); diff --git a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSFileIO.java b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSFileIO.java index 33c8e0e..a979044 100644 --- a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSFileIO.java +++ b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSFileIO.java @@ -34,7 +34,7 @@ public void setUp(){ .preSignSupportUi(false) .importSupport(false) .importValidityRegex(".*")))); - lakeFSFileIO = new LakeFSFileIO(lakeFSRepo, lakeFSRef, conf); + lakeFSFileIO = new LakeFSFileIO("lakefs://", lakeFSRepo, lakeFSRef, conf); } @Test diff --git a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSTableOperations.java b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSTableOperations.java index 5783125..ca20c14 100644 --- a/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSTableOperations.java +++ b/src/test/java/io/lakefs/iceberg/catalog/TestLakeFSTableOperations.java @@ -47,7 +47,7 @@ public void setUp() throws IOException, ApiException { baseUrl = String.format("lakefs://%s/%s", lakeFSRepo, lakeFSRef); repoCreation.setStorageNamespace(String.format("%s/%s", storageNamespace, repoCreation.getName())); lfsClient.getRepositoriesApi().createRepository(repoCreation).execute(); - lakeFSFileIO = new LakeFSFileIO(lakeFSRepo, lakeFSRef, conf); + lakeFSFileIO = new LakeFSFileIO("lakefs://", lakeFSRepo, lakeFSRef, conf); } @Test diff --git a/tests/conftest.py b/tests/conftest.py index f045c71..1a2f6aa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,8 +59,8 @@ def spark(): spark_config = SparkConf() spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog") - spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.catalog.LakeFSCatalog") spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false") + spark_config.set("spark.sql.catalog.lakefs.warehouse", "lakefs://") spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") spark_config.set("spark.hadoop.fs.s3a.access.key", access_key) spark_config.set("spark.hadoop.fs.s3a.secret.key", secret_key)